Merge pull request #1272 from influxdata/crepererum/issue1256

refactor: make chunks per-table
pull/24376/head
kodiakhq[bot] 2021-04-23 07:23:19 +00:00 committed by GitHub
commit 4086e3373a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 808 additions and 459 deletions

View File

@ -48,6 +48,9 @@ pub struct ChunkSummary {
/// The partition key of this chunk
pub partition_key: Arc<String>,
/// The table of this chunk
pub table_name: Arc<String>,
/// The id of this chunk
pub id: u32,
@ -75,12 +78,14 @@ impl ChunkSummary {
/// Construct a ChunkSummary that has None for all timestamps
pub fn new_without_timestamps(
partition_key: Arc<String>,
table_name: Arc<String>,
id: u32,
storage: ChunkStorage,
estimated_bytes: usize,
) -> Self {
Self {
partition_key,
table_name,
id,
storage,
estimated_bytes,
@ -96,6 +101,7 @@ impl From<ChunkSummary> for management::Chunk {
fn from(summary: ChunkSummary) -> Self {
let ChunkSummary {
partition_key,
table_name,
id,
storage,
estimated_bytes,
@ -112,9 +118,15 @@ impl From<ChunkSummary> for management::Chunk {
let partition_key = match Arc::try_unwrap(partition_key) {
// no one else has a reference so take the string
Ok(partition_key) => partition_key,
// some other refernece exists to this string, so clone it
// some other reference exists to this string, so clone it
Err(partition_key) => partition_key.as_ref().clone(),
};
let table_name = match Arc::try_unwrap(table_name) {
// no one else has a reference so take the string
Ok(table_name) => table_name,
// some other reference exists to this string, so clone it
Err(table_name) => table_name.as_ref().clone(),
};
let time_of_first_write = time_of_first_write.map(|t| t.into());
let time_of_last_write = time_of_last_write.map(|t| t.into());
@ -122,6 +134,7 @@ impl From<ChunkSummary> for management::Chunk {
Self {
partition_key,
table_name,
id,
storage,
estimated_bytes,
@ -181,6 +194,7 @@ impl TryFrom<management::Chunk> for ChunkSummary {
let management::Chunk {
partition_key,
table_name,
id,
estimated_bytes,
..
@ -188,9 +202,11 @@ impl TryFrom<management::Chunk> for ChunkSummary {
let estimated_bytes = estimated_bytes as usize;
let partition_key = Arc::new(partition_key);
let table_name = Arc::new(table_name);
Ok(Self {
partition_key,
table_name,
id,
storage,
estimated_bytes,
@ -226,6 +242,7 @@ mod test {
fn valid_proto_to_summary() {
let proto = management::Chunk {
partition_key: "foo".to_string(),
table_name: "bar".to_string(),
id: 42,
estimated_bytes: 1234,
storage: management::ChunkStorage::ObjectStoreOnly.into(),
@ -237,6 +254,7 @@ mod test {
let summary = ChunkSummary::try_from(proto).expect("conversion successful");
let expected = ChunkSummary {
partition_key: Arc::new("foo".to_string()),
table_name: Arc::new("bar".to_string()),
id: 42,
estimated_bytes: 1234,
storage: ChunkStorage::ObjectStoreOnly,
@ -256,6 +274,7 @@ mod test {
fn valid_summary_to_proto() {
let summary = ChunkSummary {
partition_key: Arc::new("foo".to_string()),
table_name: Arc::new("bar".to_string()),
id: 42,
estimated_bytes: 1234,
storage: ChunkStorage::ObjectStoreOnly,
@ -268,6 +287,7 @@ mod test {
let expected = management::Chunk {
partition_key: "foo".to_string(),
table_name: "bar".to_string(),
id: 42,
estimated_bytes: 1234,
storage: management::ChunkStorage::ObjectStoreOnly.into(),

View File

@ -25,6 +25,7 @@ pub enum Job {
CloseChunk {
db_name: String,
partition_key: String,
table_name: String,
chunk_id: u32,
},
@ -32,6 +33,7 @@ pub enum Job {
WriteChunk {
db_name: String,
partition_key: String,
table_name: String,
chunk_id: u32,
},
}
@ -50,19 +52,23 @@ impl From<Job> for management::operation_metadata::Job {
Job::CloseChunk {
db_name,
partition_key,
table_name,
chunk_id,
} => Self::CloseChunk(management::CloseChunk {
db_name,
partition_key,
table_name,
chunk_id,
}),
Job::WriteChunk {
db_name,
partition_key,
table_name,
chunk_id,
} => Self::WriteChunk(management::WriteChunk {
db_name,
partition_key,
table_name,
chunk_id,
}),
}
@ -84,19 +90,23 @@ impl From<management::operation_metadata::Job> for Job {
Job::CloseChunk(management::CloseChunk {
db_name,
partition_key,
table_name,
chunk_id,
}) => Self::CloseChunk {
db_name,
partition_key,
table_name,
chunk_id,
},
Job::WriteChunk(management::WriteChunk {
db_name,
partition_key,
table_name,
chunk_id,
}) => Self::WriteChunk {
db_name,
partition_key,
table_name,
chunk_id,
},
}

View File

@ -30,6 +30,9 @@ message Chunk {
// The partitition key of this chunk
string partition_key = 1;
// The table of this chunk
string table_name = 8;
// The id of this chunk
uint32 id = 2;

View File

@ -43,6 +43,9 @@ message CloseChunk {
// partition key
string partition_key = 2;
// table name
string table_name = 4;
// chunk_id
uint32 chunk_id = 3;
}
@ -55,6 +58,9 @@ message WriteChunk {
// partition key
string partition_key = 2;
// table name
string table_name = 4;
// chunk_id
uint32 chunk_id = 3;
}

View File

@ -199,6 +199,9 @@ message NewPartitionChunkRequest {
// the partition key
string partition_key = 2;
// the table name
string table_name = 3;
}
message NewPartitionChunkResponse {
@ -212,6 +215,9 @@ message ClosePartitionChunkRequest {
// the partition key
string partition_key = 2;
// the table name
string table_name = 4;
// the chunk id
uint32 chunk_id = 3;
}

View File

@ -465,13 +465,16 @@ impl Client {
&mut self,
db_name: impl Into<String>,
partition_key: impl Into<String>,
table_name: impl Into<String>,
) -> Result<(), NewPartitionChunkError> {
let db_name = db_name.into();
let partition_key = partition_key.into();
let table_name = table_name.into();
self.inner
.new_partition_chunk(NewPartitionChunkRequest {
db_name,
table_name,
partition_key,
})
.await
@ -512,16 +515,19 @@ impl Client {
&mut self,
db_name: impl Into<String>,
partition_key: impl Into<String>,
table_name: impl Into<String>,
chunk_id: u32,
) -> Result<Operation, ClosePartitionChunkError> {
let db_name = db_name.into();
let partition_key = partition_key.into();
let table_name = table_name.into();
let response = self
.inner
.close_partition_chunk(ClosePartitionChunkRequest {
db_name,
partition_key,
table_name,
chunk_id,
})
.await

View File

@ -55,80 +55,111 @@ mod system_tables;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
"Can not drop chunk {} {} from catalog: {}",
"Can not drop chunk {}:{}:{} from catalog: {}",
partition_key,
table_name,
chunk_id,
source
))]
DroppingChunk {
partition_key: String,
table_name: String,
chunk_id: u32,
source: catalog::Error,
},
#[snafu(display("Can not rollover partition {}: {}", partition_key, source))]
#[snafu(display(
"Can not rollover partition {}:{} : {}",
partition_key,
table_name,
source
))]
RollingOverPartition {
partition_key: String,
table_name: String,
source: catalog::Error,
},
#[snafu(display(
"Internal error: no open chunk while rolling over partition {}",
"Internal error: no open chunk while rolling over partition {}:{}",
partition_key,
table_name,
))]
InternalNoOpenChunk { partition_key: String },
InternalNoOpenChunk {
partition_key: String,
table_name: String,
},
#[snafu(display(
"Can not drop chunk {} {} which is {:?}. Wait for the movement to complete",
"Can not drop chunk {}:{}:{} which is {:?}. Wait for the movement to complete",
partition_key,
table_name,
chunk_id,
chunk_state
))]
DropMovingChunk {
partition_key: String,
table_name: String,
chunk_id: u32,
chunk_state: String,
},
#[snafu(display(
"Can not load partition chunk {} {} to read buffer: {}",
"Can not load partition chunk {}:{}:{} to read buffer: {}",
partition_key,
table_name,
chunk_id,
source
))]
LoadingChunk {
partition_key: String,
table_name: String,
chunk_id: u32,
source: catalog::Error,
},
#[snafu(display(
"Can not load partition chunk {} {} to parquet format in memory: {}",
"Can not load partition chunk {}:{},{} to parquet format in memory: {}",
partition_key,
table_name,
chunk_id,
source
))]
LoadingChunkToParquet {
partition_key: String,
table_name: String,
chunk_id: u32,
source: catalog::Error,
},
#[snafu(display("Read Buffer Error in chunk {}: {}", chunk_id, source))]
#[snafu(display("Read Buffer Error in chunk {}{} : {}", chunk_id, table_name, source))]
ReadBufferChunkError {
source: read_buffer::Error,
table_name: String,
chunk_id: u32,
},
#[snafu(display("Read Buffer Schema Error in chunk {}: {}", chunk_id, source))]
#[snafu(display(
"Read Buffer Schema Error in chunk {}:{} : {}",
chunk_id,
table_name,
source
))]
ReadBufferChunkSchemaError {
source: read_buffer::Error,
table_name: String,
chunk_id: u32,
},
#[snafu(display("Read Buffer Timestamp Error in chunk {}: {}", chunk_id, source))]
#[snafu(display(
"Read Buffer Timestamp Error in chunk {}:{} : {}",
chunk_id,
table_name,
source
))]
ReadBufferChunkTimestampError {
chunk_id: u32,
table_name: String,
source: read_buffer::Error,
},
@ -182,17 +213,17 @@ const STARTING_SEQUENCE: u64 = 1;
/// │ ┌────────────────┐ │
/// │ │ Partition │ │
/// │ └────────────────┘ │
/// │ │ multiple Tables (measurements) │
/// │ ▼ │
/// │ ┌────────────────┐ │
/// │ │ Table │ │
/// │ └────────────────┘ │
/// │ │ one open Chunk │
/// │ │ zero or more closed │
/// │ ▼ Chunks │
/// │ ┌────────────────┐ │
/// │ │ Chunk │ │
/// │ └────────────────┘ │
/// │ │ multiple Tables (measurements) │
/// │ ▼ │
/// │ ┌────────────────┐ │
/// │ │ Table │ │
/// │ └────────────────┘ │
/// │ │ multiple Colums │
/// │ ▼ │
/// │ ┌────────────────┐ │
@ -317,37 +348,53 @@ impl Db {
/// Rolls over the active chunk in the database's specified
/// partition. Returns the previously open (now closed) Chunk
pub async fn rollover_partition(&self, partition_key: &str) -> Result<Arc<DbChunk>> {
let partition = self
.catalog
pub async fn rollover_partition(
&self,
partition_key: &str,
table_name: &str,
) -> Result<Arc<DbChunk>> {
let partition =
self.catalog
.valid_partition(partition_key)
.context(RollingOverPartition { partition_key })?;
.context(RollingOverPartition {
partition_key,
table_name,
})?;
let mut partition = partition.write();
let chunk = partition
.open_chunk()
.context(InternalNoOpenChunk { partition_key })?;
.open_chunk(table_name)
.context(RollingOverPartition {
partition_key,
table_name,
})?
.context(InternalNoOpenChunk {
partition_key,
table_name,
})?;
let mut chunk = chunk.write();
chunk
.set_closing()
.context(RollingOverPartition { partition_key })?;
chunk.set_closing().context(RollingOverPartition {
partition_key,
table_name,
})?;
// make a new chunk to track the newly created chunk in this partition
partition.create_open_chunk(self.memory_registries.mutable_buffer.as_ref());
partition.create_open_chunk(table_name, self.memory_registries.mutable_buffer.as_ref());
return Ok(DbChunk::snapshot(&chunk));
}
/// Drops the specified chunk from the catalog and all storage systems
pub fn drop_chunk(&self, partition_key: &str, chunk_id: u32) -> Result<()> {
debug!(%partition_key, %chunk_id, "dropping chunk");
pub fn drop_chunk(&self, partition_key: &str, table_name: &str, chunk_id: u32) -> Result<()> {
debug!(%partition_key, %table_name, %chunk_id, "dropping chunk");
let partition = self
.catalog
.valid_partition(partition_key)
.context(DroppingChunk {
partition_key,
table_name,
chunk_id,
})?;
@ -358,8 +405,11 @@ impl Db {
let chunk_state;
{
let chunk = partition.chunk(chunk_id).context(DroppingChunk {
let chunk = partition
.chunk(table_name, chunk_id)
.context(DroppingChunk {
partition_key,
table_name,
chunk_id,
})?;
let chunk = chunk.read();
@ -374,16 +424,20 @@ impl Db {
!matches!(chunk.state(), ChunkState::Moving(_)),
DropMovingChunk {
partition_key,
table_name,
chunk_id,
chunk_state,
}
);
};
debug!(%partition_key, %chunk_id, %chunk_state, "dropping chunk");
debug!(%partition_key, %table_name, %chunk_id, %chunk_state, "dropping chunk");
partition.drop_chunk(chunk_id).context(DroppingChunk {
partition
.drop_chunk(table_name, chunk_id)
.context(DroppingChunk {
partition_key,
table_name,
chunk_id,
})
}
@ -401,6 +455,7 @@ impl Db {
pub async fn load_chunk_to_read_buffer(
&self,
partition_key: &str,
table_name: &str,
chunk_id: u32,
) -> Result<Arc<DbChunk>> {
let chunk = {
@ -409,12 +464,16 @@ impl Db {
.valid_partition(partition_key)
.context(LoadingChunk {
partition_key,
table_name,
chunk_id,
})?;
let partition = partition.read();
partition.chunk(chunk_id).context(LoadingChunk {
partition
.chunk(table_name, chunk_id)
.context(LoadingChunk {
partition_key,
table_name,
chunk_id,
})?
};
@ -426,11 +485,12 @@ impl Db {
chunk.set_moving().context(LoadingChunk {
partition_key,
table_name,
chunk_id,
})?
};
info!(%partition_key, %chunk_id, "chunk marked MOVING, loading tables into read buffer");
info!(%partition_key, %table_name, %chunk_id, "chunk marked MOVING, loading tables into read buffer");
let mut batches = Vec::new();
let table_stats = mb_chunk.table_summaries();
@ -441,7 +501,7 @@ impl Db {
// load tables into the new chunk one by one.
for stats in table_stats {
debug!(%partition_key, %chunk_id, table=%stats.name, "loading table to read buffer");
debug!(%partition_key, %table_name, %chunk_id, table=%stats.name, "loading table to read buffer");
mb_chunk
.table_to_arrow(&mut batches, &stats.name, Selection::All)
// It is probably reasonable to recover from this error
@ -460,10 +520,11 @@ impl Db {
// update the catalog to say we are done processing
chunk.set_moved(Arc::new(rb_chunk)).context(LoadingChunk {
partition_key,
table_name,
chunk_id,
})?;
debug!(%partition_key, %chunk_id, "chunk marked MOVED. loading complete");
debug!(%partition_key, %table_name, %chunk_id, "chunk marked MOVED. loading complete");
Ok(DbChunk::snapshot(&chunk))
}
@ -471,6 +532,7 @@ impl Db {
pub async fn write_chunk_to_object_store(
&self,
partition_key: &str,
table_name: &str,
chunk_id: u32,
) -> Result<Arc<DbChunk>> {
// Get the chunk from the catalog
@ -480,12 +542,16 @@ impl Db {
.valid_partition(partition_key)
.context(LoadingChunkToParquet {
partition_key,
table_name,
chunk_id,
})?;
let partition = partition.read();
partition.chunk(chunk_id).context(LoadingChunkToParquet {
partition
.chunk(table_name, chunk_id)
.context(LoadingChunkToParquet {
partition_key,
table_name,
chunk_id,
})?
};
@ -499,11 +565,12 @@ impl Db {
.set_writing_to_object_store()
.context(LoadingChunkToParquet {
partition_key,
table_name,
chunk_id,
})?
};
debug!(%partition_key, %chunk_id, "chunk marked WRITING , loading tables into object store");
debug!(%partition_key, %table_name, %chunk_id, "chunk marked WRITING , loading tables into object store");
// Get all tables in this chunk
let table_stats = rb_chunk.table_summaries();
@ -522,21 +589,30 @@ impl Db {
);
for stats in table_stats {
debug!(%partition_key, %chunk_id, table=%stats.name, "loading table to object store");
debug!(%partition_key, %table_name, %chunk_id, table=%stats.name, "loading table to object store");
let predicate = read_buffer::Predicate::default();
// Get RecordBatchStream of data from the read buffer chunk
let read_results = rb_chunk
.read_filter(stats.name.as_str(), predicate, Selection::All)
.context(ReadBufferChunkError { chunk_id })?;
.context(ReadBufferChunkError {
table_name,
chunk_id,
})?;
let arrow_schema: ArrowSchemaRef = rb_chunk
.read_filter_table_schema(stats.name.as_str(), Selection::All)
.context(ReadBufferChunkSchemaError { chunk_id })?
.context(ReadBufferChunkSchemaError {
table_name,
chunk_id,
})?
.into();
let time_range = rb_chunk
.table_time_range(stats.name.as_str())
.context(ReadBufferChunkTimestampError { chunk_id })?;
let time_range = rb_chunk.table_time_range(stats.name.as_str()).context(
ReadBufferChunkTimestampError {
table_name,
chunk_id,
},
)?;
let stream: SendableRecordBatchStream = Box::pin(
streams::ReadFilterResultsStream::new(read_results, Arc::clone(&arrow_schema)),
);
@ -569,10 +645,11 @@ impl Db {
.set_written_to_object_store(parquet_chunk)
.context(LoadingChunkToParquet {
partition_key,
table_name,
chunk_id,
})?;
debug!(%partition_key, %chunk_id, "chunk marked MOVED. Persisting to object store complete");
debug!(%partition_key, %table_name, %chunk_id, "chunk marked MOVED. Persisting to object store complete");
Ok(DbChunk::snapshot(&chunk))
}
@ -582,27 +659,29 @@ impl Db {
pub fn load_chunk_to_read_buffer_in_background(
self: &Arc<Self>,
partition_key: String,
table_name: String,
chunk_id: u32,
) -> TaskTracker<Job> {
let name = self.rules.read().name.clone();
let (tracker, registration) = self.jobs.register(Job::CloseChunk {
db_name: name.to_string(),
partition_key: partition_key.clone(),
table_name: table_name.clone(),
chunk_id,
});
let captured = Arc::clone(&self);
let task = async move {
debug!(%name, %partition_key, %chunk_id, "background task loading chunk to read buffer");
debug!(%name, %partition_key, %table_name, %chunk_id, "background task loading chunk to read buffer");
let result = captured
.load_chunk_to_read_buffer(&partition_key, chunk_id)
.load_chunk_to_read_buffer(&partition_key, &table_name, chunk_id)
.await;
if let Err(e) = result {
info!(?e, %name, %partition_key, %chunk_id, "background task error loading read buffer chunk");
return Err(e);
}
debug!(%name, %partition_key, %chunk_id, "background task completed closing chunk");
debug!(%name, %partition_key, %table_name, %chunk_id, "background task completed closing chunk");
Ok(())
};
@ -617,27 +696,29 @@ impl Db {
pub fn write_chunk_to_object_store_in_background(
self: &Arc<Self>,
partition_key: String,
table_name: String,
chunk_id: u32,
) -> TaskTracker<Job> {
let name = self.rules.read().name.clone();
let (tracker, registration) = self.jobs.register(Job::WriteChunk {
db_name: name.to_string(),
partition_key: partition_key.clone(),
table_name: table_name.clone(),
chunk_id,
});
let captured = Arc::clone(&self);
let task = async move {
debug!(%name, %partition_key, %chunk_id, "background task loading chunk to object store");
debug!(%name, %partition_key, %table_name, %chunk_id, "background task loading chunk to object store");
let result = captured
.write_chunk_to_object_store(&partition_key, chunk_id)
.write_chunk_to_object_store(&partition_key, &table_name, chunk_id)
.await;
if let Err(e) = result {
info!(?e, %name, %partition_key, %chunk_id, "background task error loading object store chunk");
return Err(e);
}
debug!(%name, %partition_key, %chunk_id, "background task completed writing chunk to object store");
debug!(%name, %partition_key, %table_name, %chunk_id, "background task completed writing chunk to object store");
Ok(())
};
@ -675,14 +756,19 @@ impl Db {
/// Return table summary information for the given chunk in the specified
/// partition
pub fn table_summaries(&self, partition_key: &str, chunk_id: u32) -> Vec<TableSummary> {
pub fn table_summary(
&self,
partition_key: &str,
table_name: &str,
chunk_id: u32,
) -> Option<TableSummary> {
if let Some(partition) = self.catalog.partition(partition_key) {
let partition = partition.read();
if let Ok(chunk) = partition.chunk(chunk_id) {
return chunk.read().table_summaries();
if let Ok(chunk) = partition.chunk(table_name, chunk_id) {
return chunk.read().table_summary();
}
}
Default::default()
None
}
/// Returns the number of iterations of the background worker loop
@ -758,8 +844,16 @@ impl Db {
let mut partition = partition.write();
partition.update_last_write_at();
let chunk = partition.open_chunk().unwrap_or_else(|| {
partition.create_open_chunk(self.memory_registries.mutable_buffer.as_ref())
for table_batch in write.table_batches() {
let chunk = partition
.open_chunk(table_batch.name())
.ok()
.flatten()
.unwrap_or_else(|| {
partition.create_open_chunk(
table_batch.name(),
self.memory_registries.mutable_buffer.as_ref(),
)
});
let mut chunk = chunk.write();
@ -772,7 +866,7 @@ impl Db {
.write_table_batches(
sequenced_entry.clock_value(),
sequenced_entry.writer_id(),
&write.table_batches(),
&[table_batch],
)
.context(WriteEntry {
partition_key,
@ -788,6 +882,7 @@ impl Db {
}
}
}
}
Ok(())
}
@ -853,15 +948,34 @@ impl CatalogProvider for Db {
pub mod test_helpers {
use super::*;
use internal_types::entry::test_helpers::lp_to_entries;
use std::collections::HashSet;
pub fn try_write_lp(db: &Db, lp: &str) -> Result<()> {
/// Try to write lineprotocol data and return all tables that where written.
pub fn try_write_lp(db: &Db, lp: &str) -> Result<Vec<String>> {
let entries = lp_to_entries(lp);
entries
.into_iter()
.try_for_each(|entry| db.store_entry(entry))
let mut tables = HashSet::new();
for entry in &entries {
if let Some(writes) = entry.partition_writes() {
for write in writes {
for batch in write.table_batches() {
tables.insert(batch.name().to_string());
}
}
}
}
pub fn write_lp(db: &Db, lp: &str) {
entries
.into_iter()
.try_for_each(|entry| db.store_entry(entry))?;
let mut tables: Vec<_> = tables.into_iter().collect();
tables.sort();
Ok(tables)
}
/// Same was [`try_write_lp`](try_write_lp) but will panic on failure.
pub fn write_lp(db: &Db, lp: &str) -> Vec<String> {
try_write_lp(db, lp).unwrap()
}
}
@ -935,7 +1049,7 @@ mod tests {
write_lp(db.as_ref(), "cpu bar=1 10");
assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap());
let mb_chunk = db.rollover_partition("1970-01-01T00").await.unwrap();
let mb_chunk = db.rollover_partition("1970-01-01T00", "cpu").await.unwrap();
assert_eq!(mb_chunk.id(), 0);
let expected = vec![
@ -962,7 +1076,7 @@ mod tests {
assert_batches_sorted_eq!(&expected, &batches);
// And expect that we still get the same thing when data is rolled over again
let chunk = db.rollover_partition("1970-01-01T00").await.unwrap();
let chunk = db.rollover_partition("1970-01-01T00", "cpu").await.unwrap();
assert_eq!(chunk.id(), 1);
let batches = run_query(db, "select * from cpu").await;
@ -985,7 +1099,7 @@ mod tests {
write_lp(db.as_ref(), &lines.join("\n"));
assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap());
let mb_chunk = db.rollover_partition("1970-01-01T00").await.unwrap();
let mb_chunk = db.rollover_partition("1970-01-01T00", "cpu").await.unwrap();
assert_eq!(mb_chunk.id(), 0);
let expected = vec![
@ -1009,9 +1123,9 @@ mod tests {
write_lp(db.as_ref(), "cpu bar=2 20");
let partition_key = "1970-01-01T00";
let mb_chunk = db.rollover_partition(partition_key).await.unwrap();
let mb_chunk = db.rollover_partition(partition_key, "cpu").await.unwrap();
let rb_chunk = db
.load_chunk_to_read_buffer(partition_key, mb_chunk.id())
.load_chunk_to_read_buffer(partition_key, "cpu", mb_chunk.id())
.await
.unwrap();
@ -1036,7 +1150,7 @@ mod tests {
assert_table_eq!(&expected, &batches);
// drop, the chunk from the read buffer
db.drop_chunk(partition_key, mb_chunk.id()).unwrap();
db.drop_chunk(partition_key, "cpu", mb_chunk.id()).unwrap();
assert_eq!(
read_buffer_chunk_ids(db.as_ref(), partition_key),
vec![] as Vec<u32>
@ -1063,7 +1177,7 @@ mod tests {
}
#[tokio::test]
async fn write_one_chunk_one_table_to_parquet_file() {
async fn write_one_chunk_to_parquet_file() {
// Test that data can be written into parquet files
// Create an object store with a specified location in a local disk
@ -1081,15 +1195,15 @@ mod tests {
//Now mark the MB chunk close
let partition_key = "1970-01-01T00";
let mb_chunk = db.rollover_partition("1970-01-01T00").await.unwrap();
let mb_chunk = db.rollover_partition("1970-01-01T00", "cpu").await.unwrap();
// Move that MB chunk to RB chunk and drop it from MB
let rb_chunk = db
.load_chunk_to_read_buffer(partition_key, mb_chunk.id())
.load_chunk_to_read_buffer(partition_key, "cpu", mb_chunk.id())
.await
.unwrap();
// Write the RB chunk to Object Store but keep it in RB
let pq_chunk = db
.write_chunk_to_object_store(partition_key, mb_chunk.id())
.write_chunk_to_object_store(partition_key, "cpu", mb_chunk.id())
.await
.unwrap();
@ -1149,114 +1263,6 @@ mod tests {
assert_table_eq!(expected, &content);
}
#[tokio::test]
async fn write_one_chunk_many_tables_to_parquet_files() {
// Test that data can be written into parquet files
// Create an object store with a specified location in a local disk
let root = TempDir::new().unwrap();
let object_store = Arc::new(ObjectStore::new_file(File::new(root.path())));
// Create a DB given a server id, an object store and a db name
let server_id: NonZeroU32 = NonZeroU32::new(10).unwrap();
let db_name = "parquet_test_db";
let db = Arc::new(make_database(server_id, Arc::clone(&object_store), db_name));
// Write some line protocols in Mutable buffer of the DB
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "disk ops=1 20");
write_lp(db.as_ref(), "cpu bar=2 20");
//Now mark the MB chunk close
let partition_key = "1970-01-01T00";
let mb_chunk = db.rollover_partition("1970-01-01T00").await.unwrap();
// Move that MB chunk to RB chunk and drop it from MB
let rb_chunk = db
.load_chunk_to_read_buffer(partition_key, mb_chunk.id())
.await
.unwrap();
// Write the RB chunk to Object Store but keep it in RB
let pq_chunk = db
.write_chunk_to_object_store(partition_key, mb_chunk.id())
.await
.unwrap();
// it should be the same chunk!
assert_eq!(mb_chunk.id(), rb_chunk.id());
assert_eq!(mb_chunk.id(), pq_chunk.id());
// we should have chunks in the mutable buffer, read buffer, and object store
// (Note the currently open chunk is not listed)
assert_eq!(mutable_chunk_ids(&db, partition_key), vec![1]);
assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![0]);
assert_eq!(read_parquet_file_chunk_ids(&db, partition_key), vec![0]);
// Verify data written to the parquet files in object store
// First, there must be 2 paths of object store in the catalog
// that represents 2 files
let paths = pq_chunk.object_store_paths();
assert_eq!(paths.len(), 2);
// Check that the path must exist in the object store
let prefix = object_store.new_path();
let path_list = flatten_list_stream(Arc::clone(&object_store), Some(&prefix))
.await
.unwrap();
println!("path_list: {:#?}", path_list);
assert_eq!(path_list.len(), 2);
// Check the content of each path
//
// Root path
let root_path = format!("{:?}", root.path());
let root_path = root_path.trim_matches('"');
for path in path_list {
// Get full string path
let path_string = format!("{}/{}", root_path, path.display());
println!("path: {}", path_string);
// Create External table of this parquet file to get its content in a human
// readable form
// Note: We do not care about escaping quotes here because it is just a test
let sql = format!(
"CREATE EXTERNAL TABLE parquet_table STORED AS PARQUET LOCATION '{}'",
path_string
);
let mut ctx = context::ExecutionContext::new();
let df = ctx.sql(&sql).unwrap();
df.collect().await.unwrap();
// Select data from that table
let sql = "SELECT * FROM parquet_table";
let content = ctx.sql(&sql).unwrap().collect().await.unwrap();
println!("Content: {:?}", content);
let expected = if path_string.contains("cpu") {
// file name: cpu.parquet
vec![
"+-----+-------------------------------+",
"| bar | time |",
"+-----+-------------------------------+",
"| 1 | 1970-01-01 00:00:00.000000010 |",
"| 2 | 1970-01-01 00:00:00.000000020 |",
"+-----+-------------------------------+",
]
} else {
// file name: disk.parquet
vec![
"+-----+-------------------------------+",
"| ops | time |",
"+-----+-------------------------------+",
"| 1 | 1970-01-01 00:00:00.000000020 |",
"+-----+-------------------------------+",
]
};
assert_table_eq!(expected, &content);
}
}
#[tokio::test]
async fn write_updates_last_write_at() {
let db = make_db();
@ -1295,12 +1301,16 @@ mod tests {
// When the chunk is rolled over
let partition_key = "1970-01-01T00";
let chunk_id = db.rollover_partition("1970-01-01T00").await.unwrap().id();
let chunk_id = db
.rollover_partition("1970-01-01T00", "cpu")
.await
.unwrap()
.id();
let after_rollover = Utc::now();
let partition = db.catalog.valid_partition(partition_key).unwrap();
let partition = partition.read();
let chunk = partition.chunk(chunk_id).unwrap();
let chunk = partition.chunk("cpu", chunk_id).unwrap();
let chunk = chunk.read();
println!(
@ -1386,14 +1396,14 @@ mod tests {
);
let partition_key = "1970-01-01T00";
let mb_chunk = db.rollover_partition("1970-01-01T00").await.unwrap();
let mb_chunk = db.rollover_partition("1970-01-01T00", "cpu").await.unwrap();
assert_eq!(mb_chunk.id(), 0);
// add a new chunk in mutable buffer, and move chunk1 (but
// not chunk 0) to read buffer
write_lp(&db, "cpu bar=1 30");
let mb_chunk = db.rollover_partition("1970-01-01T00").await.unwrap();
db.load_chunk_to_read_buffer(partition_key, mb_chunk.id())
let mb_chunk = db.rollover_partition("1970-01-01T00", "cpu").await.unwrap();
db.load_chunk_to_read_buffer(partition_key, "cpu", mb_chunk.id())
.await
.unwrap();
@ -1410,12 +1420,19 @@ mod tests {
.map(|summary| {
let ChunkSummary {
partition_key,
table_name,
id,
storage,
estimated_bytes,
..
} = summary;
ChunkSummary::new_without_timestamps(partition_key, id, storage, estimated_bytes)
ChunkSummary::new_without_timestamps(
partition_key,
table_name,
id,
storage,
estimated_bytes,
)
})
.collect::<Vec<_>>();
summaries.sort_unstable();
@ -1428,7 +1445,7 @@ mod tests {
let db = make_db();
write_lp(&db, "cpu bar=1 1");
db.rollover_partition("1970-01-01T00").await.unwrap();
db.rollover_partition("1970-01-01T00", "cpu").await.unwrap();
// write into a separate partitiion
write_lp(&db, "cpu bar=1,baz2,frob=3 400000000000000");
@ -1444,6 +1461,7 @@ mod tests {
let expected = vec![ChunkSummary::new_without_timestamps(
to_arc("1970-01-05T15"),
to_arc("cpu"),
0,
ChunkStorage::OpenMutableBuffer,
127,
@ -1472,7 +1490,7 @@ mod tests {
write_lp(&db, "cpu bar=1 1");
let after_first_write = Utc::now();
write_lp(&db, "cpu bar=2 2");
db.rollover_partition("1970-01-01T00").await.unwrap();
db.rollover_partition("1970-01-01T00", "cpu").await.unwrap();
let after_close = Utc::now();
let mut chunk_summaries = db.chunk_summaries().unwrap();
@ -1522,20 +1540,20 @@ mod tests {
// get three chunks: one open, one closed in mb and one close in rb
write_lp(&db, "cpu bar=1 1");
db.rollover_partition("1970-01-01T00").await.unwrap();
db.rollover_partition("1970-01-01T00", "cpu").await.unwrap();
write_lp(&db, "cpu bar=1,baz=2 2");
write_lp(&db, "cpu bar=1,baz=2,frob=3 400000000000000");
print!("Partitions: {:?}", db.partition_keys().unwrap());
db.load_chunk_to_read_buffer("1970-01-01T00", 0)
db.load_chunk_to_read_buffer("1970-01-01T00", "cpu", 0)
.await
.unwrap();
print!("Partitions2: {:?}", db.partition_keys().unwrap());
db.rollover_partition("1970-01-05T15").await.unwrap();
db.rollover_partition("1970-01-05T15", "cpu").await.unwrap();
write_lp(&db, "cpu bar=1,baz=3,blargh=3 400000000000000");
fn to_arc(s: &str) -> Arc<String> {
@ -1548,24 +1566,28 @@ mod tests {
let expected = vec![
ChunkSummary::new_without_timestamps(
to_arc("1970-01-01T00"),
to_arc("cpu"),
0,
ChunkStorage::ReadBuffer,
1269,
),
ChunkSummary::new_without_timestamps(
to_arc("1970-01-01T00"),
to_arc("cpu"),
1,
ChunkStorage::OpenMutableBuffer,
121,
),
ChunkSummary::new_without_timestamps(
to_arc("1970-01-05T15"),
to_arc("cpu"),
0,
ChunkStorage::ClosedMutableBuffer,
157,
),
ChunkSummary::new_without_timestamps(
to_arc("1970-01-05T15"),
to_arc("cpu"),
1,
ChunkStorage::OpenMutableBuffer,
159,
@ -1588,12 +1610,16 @@ mod tests {
let db = make_db();
write_lp(&db, "cpu bar=1 1");
let chunk_id = db.rollover_partition("1970-01-01T00").await.unwrap().id();
let chunk_id = db
.rollover_partition("1970-01-01T00", "cpu")
.await
.unwrap()
.id();
write_lp(&db, "cpu bar=2,baz=3.0 2");
write_lp(&db, "mem foo=1 1");
// load a chunk to the read buffer
db.load_chunk_to_read_buffer("1970-01-01T00", chunk_id)
db.load_chunk_to_read_buffer("1970-01-01T00", "cpu", chunk_id)
.await
.unwrap();
@ -1644,14 +1670,6 @@ mod tests {
TableSummary {
name: "mem".into(),
columns: vec![
ColumnSummary {
name: "time".into(),
stats: Statistics::I64(StatValues {
min: 1,
max: 1,
count: 1,
}),
},
ColumnSummary {
name: "foo".into(),
stats: Statistics::F64(StatValues {
@ -1660,6 +1678,14 @@ mod tests {
count: 1,
}),
},
ColumnSummary {
name: "time".into(),
stats: Statistics::I64(StatValues {
min: 1,
max: 1,
count: 1,
}),
},
],
},
],
@ -1691,14 +1717,6 @@ mod tests {
TableSummary {
name: "mem".into(),
columns: vec![
ColumnSummary {
name: "time".into(),
stats: Statistics::I64(StatValues {
min: 400000000000001,
max: 400000000000001,
count: 1,
}),
},
ColumnSummary {
name: "frob".into(),
stats: Statistics::F64(StatValues {
@ -1707,6 +1725,14 @@ mod tests {
count: 1,
}),
},
ColumnSummary {
name: "time".into(),
stats: Statistics::I64(StatValues {
min: 400000000000001,
max: 400000000000001,
count: 1,
}),
},
],
},
],
@ -1784,16 +1810,23 @@ mod tests {
// MB => RB
let partition_key = "1970-01-01T00";
let mb_chunk = db.rollover_partition(partition_key).await.unwrap();
let table_name = "cpu";
let mb_chunk = db
.rollover_partition(partition_key, table_name)
.await
.unwrap();
let rb_chunk = db
.load_chunk_to_read_buffer(partition_key, mb_chunk.id())
.load_chunk_to_read_buffer(partition_key, table_name, mb_chunk.id())
.await
.unwrap();
assert_eq!(mb_chunk.id(), rb_chunk.id());
// RB => OS
let task =
db.write_chunk_to_object_store_in_background(partition_key.to_string(), rb_chunk.id());
let task = db.write_chunk_to_object_store_in_background(
partition_key.to_string(),
table_name.to_string(),
rb_chunk.id(),
);
let t_start = std::time::Instant::now();
while !task.is_complete() {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

View File

@ -30,15 +30,23 @@ pub enum Error {
#[snafu(display("unknown partition: {}", partition_key))]
UnknownPartition { partition_key: String },
#[snafu(display("unknown chunk: {}:{}", partition_key, chunk_id))]
#[snafu(display("unknown table: {}:{}", partition_key, table_name))]
UnknownTable {
partition_key: String,
table_name: String,
},
#[snafu(display("unknown chunk: {}:{}:{}", partition_key, table_name, chunk_id))]
UnknownChunk {
partition_key: String,
table_name: String,
chunk_id: u32,
},
#[snafu(display(
"Internal unexpected chunk state for {}:{} during {}. Expected {}, got {}",
"Internal unexpected chunk state for {}:{}:{} during {}. Expected {}, got {}",
partition_key,
table_name,
chunk_id,
operation,
expected,
@ -46,6 +54,7 @@ pub enum Error {
))]
InternalChunkState {
partition_key: String,
table_name: String,
chunk_id: u32,
operation: String,
expected: String,
@ -186,7 +195,7 @@ impl SchemaProvider for Catalog {
self.partitions().for_each(|partition| {
let partition = partition.read();
partition.chunks().for_each(|chunk| {
chunk.read().table_names(&mut names);
names.insert(chunk.read().table_name().to_string());
})
});
@ -202,7 +211,7 @@ impl SchemaProvider for Catalog {
for chunk in partition.chunks() {
let chunk = chunk.read();
if chunk.has_table(table_name) {
if (chunk.table_name() == table_name) && chunk.has_data() {
let chunk = super::DbChunk::snapshot(&chunk);
// This should only fail if the table doesn't exist which isn't possible
@ -275,19 +284,30 @@ mod tests {
let p1 = catalog.get_or_create_partition("p1");
let mut p1 = p1.write();
p1.create_open_chunk(&registry);
p1.create_open_chunk(&registry);
p1.create_open_chunk("table1", &registry);
p1.create_open_chunk("table1", &registry);
p1.create_open_chunk("table2", &registry);
let c1_0 = p1.chunk(0).unwrap();
let c1_0 = p1.chunk("table1", 0).unwrap();
assert_eq!(c1_0.read().table_name(), "table1");
assert_eq!(c1_0.read().key(), "p1");
assert_eq!(c1_0.read().id(), 0);
let c1_1 = p1.chunk(1).unwrap();
let c1_1 = p1.chunk("table1", 1).unwrap();
assert_eq!(c1_1.read().table_name(), "table1");
assert_eq!(c1_1.read().key(), "p1");
assert_eq!(c1_1.read().id(), 1);
let err = p1.chunk(100).unwrap_err();
assert_eq!(err.to_string(), "unknown chunk: p1:100");
let c2_0 = p1.chunk("table2", 0).unwrap();
assert_eq!(c2_0.read().table_name(), "table2");
assert_eq!(c2_0.read().key(), "p1");
assert_eq!(c2_0.read().id(), 0);
let err = p1.chunk("table1", 100).unwrap_err();
assert_eq!(err.to_string(), "unknown chunk: p1:table1:100");
let err = p1.chunk("table3", 0).unwrap_err();
assert_eq!(err.to_string(), "unknown table: p1:table3");
}
#[test]
@ -299,26 +319,39 @@ mod tests {
{
let mut p1 = p1.write();
p1.create_open_chunk(&registry);
p1.create_open_chunk(&registry);
p1.create_open_chunk("table1", &registry);
p1.create_open_chunk("table1", &registry);
p1.create_open_chunk("table2", &registry);
}
let p2 = catalog.get_or_create_partition("p2");
{
let mut p2 = p2.write();
p2.create_open_chunk(&registry);
p2.create_open_chunk("table1", &registry);
}
assert_eq!(
chunk_strings(&catalog),
vec!["Chunk p1:0", "Chunk p1:1", "Chunk p2:0"]
vec![
"Chunk p1:table1:0",
"Chunk p1:table1:1",
"Chunk p1:table2:0",
"Chunk p2:table1:0"
]
);
assert_eq!(
partition_chunk_strings(&catalog, "p1"),
vec!["Chunk p1:0", "Chunk p1:1"]
vec![
"Chunk p1:table1:0",
"Chunk p1:table1:1",
"Chunk p1:table2:0"
]
);
assert_eq!(
partition_chunk_strings(&catalog, "p2"),
vec!["Chunk p2:table1:0"]
);
assert_eq!(partition_chunk_strings(&catalog, "p2"), vec!["Chunk p2:0"]);
}
fn chunk_strings(catalog: &Catalog) -> Vec<String> {
@ -329,7 +362,7 @@ mod tests {
p.chunks()
.map(|c| {
let c = c.read();
format!("Chunk {}:{}", c.key(), c.id())
format!("Chunk {}:{}:{}", c.key(), c.table_name(), c.id())
})
.collect::<Vec<_>>()
.into_iter()
@ -348,7 +381,7 @@ mod tests {
.chunks()
.map(|c| {
let c = c.read();
format!("Chunk {}:{}", c.key(), c.id())
format!("Chunk {}:{}:{}", c.key(), c.table_name(), c.id())
})
.collect();
@ -364,41 +397,55 @@ mod tests {
let p1 = catalog.get_or_create_partition("p1");
{
let mut p1 = p1.write();
p1.create_open_chunk(&registry);
p1.create_open_chunk(&registry);
p1.create_open_chunk("table1", &registry);
p1.create_open_chunk("table1", &registry);
p1.create_open_chunk("table2", &registry);
}
let p2 = catalog.get_or_create_partition("p2");
{
let mut p2 = p2.write();
p2.create_open_chunk(&registry);
p2.create_open_chunk("table1", &registry);
}
assert_eq!(chunk_strings(&catalog).len(), 4);
{
let mut p1 = p1.write();
p1.drop_chunk("table2", 0).unwrap();
p1.chunk("table2", 0).unwrap_err(); // chunk is gone
}
assert_eq!(chunk_strings(&catalog).len(), 3);
{
let mut p1 = p1.write();
p1.drop_chunk(1).unwrap();
p1.chunk(1).unwrap_err(); // chunk is gone
p1.drop_chunk("table1", 1).unwrap();
p1.chunk("table1", 1).unwrap_err(); // chunk is gone
}
assert_eq!(chunk_strings(&catalog).len(), 2);
{
let mut p2 = p1.write();
p2.drop_chunk(0).unwrap();
p2.chunk(0).unwrap_err(); // chunk is gone
p2.drop_chunk("table1", 0).unwrap();
p2.chunk("table1", 0).unwrap_err(); // chunk is gone
}
assert_eq!(chunk_strings(&catalog).len(), 1);
}
#[test]
fn chunk_drop_non_existent_chunk() {
let registry = MemRegistry::new();
let catalog = Catalog::new();
let p3 = catalog.get_or_create_partition("p3");
let mut p3 = p3.write();
let err = p3.drop_chunk(0).unwrap_err();
assert_eq!(err.to_string(), "unknown chunk: p3:0");
p3.create_open_chunk("table1", &registry);
let err = p3.drop_chunk("table2", 0).unwrap_err();
assert_eq!(err.to_string(), "unknown table: p3:table2");
let err = p3.drop_chunk("table1", 1).unwrap_err();
assert_eq!(err.to_string(), "unknown chunk: p3:table1:1");
}
#[test]
@ -410,22 +457,28 @@ mod tests {
{
let mut p1 = p1.write();
p1.create_open_chunk(&registry);
p1.create_open_chunk(&registry);
p1.create_open_chunk("table1", &registry);
p1.create_open_chunk("table1", &registry);
}
assert_eq!(chunk_strings(&catalog).len(), 2);
assert_eq!(
chunk_strings(&catalog),
vec!["Chunk p1:table1:0", "Chunk p1:table1:1"]
);
{
let mut p1 = p1.write();
p1.drop_chunk(0).unwrap();
p1.drop_chunk("table1", 0).unwrap();
}
assert_eq!(chunk_strings(&catalog).len(), 1);
assert_eq!(chunk_strings(&catalog), vec!["Chunk p1:table1:1"]);
// should be ok to recreate (thought maybe not a great idea)
// should be ok to "re-create", it gets another chunk_id though
{
let mut p1 = p1.write();
p1.create_open_chunk(&registry);
p1.create_open_chunk("table1", &registry);
}
assert_eq!(chunk_strings(&catalog).len(), 2);
assert_eq!(
chunk_strings(&catalog),
vec!["Chunk p1:table1:1", "Chunk p1:table1:2"]
);
}
}

View File

@ -1,4 +1,3 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use chrono::{DateTime, Utc};
@ -63,6 +62,9 @@ pub struct Chunk {
/// What partition does the chunk belong to?
partition_key: Arc<String>,
/// What table does the chunk belong to?
table_name: Arc<String>,
/// The ID of the chunk
id: u32,
@ -87,6 +89,7 @@ macro_rules! unexpected_state {
($SELF: expr, $OP: expr, $EXPECTED: expr, $STATE: expr) => {
InternalChunkState {
partition_key: $SELF.partition_key.as_str(),
table_name: $SELF.table_name.as_str(),
chunk_id: $SELF.id,
operation: $OP,
expected: $EXPECTED,
@ -98,9 +101,15 @@ macro_rules! unexpected_state {
impl Chunk {
/// Create a new chunk in the provided state
pub(crate) fn new(partition_key: impl Into<String>, id: u32, state: ChunkState) -> Self {
pub(crate) fn new(
partition_key: impl Into<String>,
table_name: impl Into<String>,
id: u32,
state: ChunkState,
) -> Self {
Self {
partition_key: Arc::new(partition_key.into()),
table_name: Arc::new(table_name.into()),
id,
state,
time_of_first_write: None,
@ -112,11 +121,12 @@ impl Chunk {
/// Creates a new open chunk
pub(crate) fn new_open(
partition_key: impl Into<String>,
table_name: impl Into<String>,
id: u32,
memory_registry: &MemRegistry,
) -> Self {
let state = ChunkState::Open(mutable_buffer::chunk::Chunk::new(id, memory_registry));
Self::new(partition_key, id, state)
Self::new(partition_key, table_name, id, state)
}
/// Used for testing
@ -138,6 +148,10 @@ impl Chunk {
self.partition_key.as_ref()
}
pub fn table_name(&self) -> &str {
self.table_name.as_ref()
}
pub fn state(&self) -> &ChunkState {
&self.state
}
@ -181,6 +195,7 @@ impl Chunk {
ChunkSummary {
partition_key: Arc::clone(&self.partition_key),
table_name: Arc::clone(&self.table_name),
id: self.id,
storage,
estimated_bytes,
@ -190,66 +205,77 @@ impl Chunk {
}
}
/// Return TableSummary metadata for each table in this chunk
pub fn table_summaries(&self) -> Vec<TableSummary> {
/// Return TableSummary metadata
///
/// May be `None` if no data is present within the chunk state (also see [`Self::has_data`](Self::has_data)).
pub fn table_summary(&self) -> Option<TableSummary> {
match &self.state {
ChunkState::Invalid => panic!("invalid chunk state"),
ChunkState::Open(chunk) | ChunkState::Closing(chunk) => chunk.table_summaries(),
ChunkState::Moving(chunk) => chunk.table_summaries(),
ChunkState::Moved(chunk) => chunk.table_summaries(),
ChunkState::WritingToObjectStore(chunk) => chunk.table_summaries(),
ChunkState::WrittenToObjectStore(chunk, _) => chunk.table_summaries(),
ChunkState::Open(chunk) | ChunkState::Closing(chunk) => {
let mut summaries = chunk.table_summaries();
assert!(summaries.len() <= 1);
if summaries.len() == 1 {
Some(summaries.remove(0))
} else {
None
}
}
/// Returns true if this chunk contains a table with the provided name
pub fn has_table(&self, table_name: &str) -> bool {
match &self.state {
ChunkState::Invalid => false,
ChunkState::Open(chunk) | ChunkState::Closing(chunk) => chunk.has_table(table_name),
ChunkState::Moving(chunk) => chunk.has_table(table_name),
ChunkState::Moved(chunk) => chunk.has_table(table_name),
ChunkState::WritingToObjectStore(chunk) => chunk.has_table(table_name),
ChunkState::WrittenToObjectStore(chunk, _) => chunk.has_table(table_name),
ChunkState::Moving(chunk) => {
let mut summaries = chunk.table_summaries();
assert!(summaries.len() <= 1);
if summaries.len() == 1 {
Some(summaries.remove(0))
} else {
None
}
}
/// Collects the chunk's table names into `names`
pub fn table_names(&self, names: &mut BTreeSet<String>) {
match &self.state {
ChunkState::Invalid => {}
ChunkState::Open(chunk) | ChunkState::Closing(chunk) => chunk.all_table_names(names),
ChunkState::Moving(chunk) => chunk.all_table_names(names),
ChunkState::Moved(chunk) => {
// TODO - the RB API returns a new set each time, so maybe this
// method should be updated to do the same across the mutable
// buffer.
let rb_names = chunk.all_table_names(names);
for name in rb_names {
names.insert(name);
let mut summaries = chunk.table_summaries();
assert!(summaries.len() <= 1);
if summaries.len() == 1 {
Some(summaries.remove(0))
} else {
None
}
}
ChunkState::WritingToObjectStore(chunk) => {
// TODO - the RB API returns a new set each time, so maybe this
// method should be updated to do the same across the mutable
// buffer.
let rb_names = chunk.all_table_names(names);
for name in rb_names {
names.insert(name);
let mut summaries = chunk.table_summaries();
assert!(summaries.len() <= 1);
if summaries.len() == 1 {
Some(summaries.remove(0))
} else {
None
}
}
ChunkState::WrittenToObjectStore(chunk, _) => {
// TODO - the RB API returns a new set each time, so maybe this
// method should be updated to do the same across the mutable
// buffer.
let rb_names = chunk.all_table_names(names);
for name in rb_names {
names.insert(name);
let mut summaries = chunk.table_summaries();
assert!(summaries.len() <= 1);
if summaries.len() == 1 {
Some(summaries.remove(0))
} else {
None
}
}
}
}
/// Returns true if this chunk contains any real data.
///
/// This is required because some chunk states can be empty (= no schema data at all) which confused the heck out of
/// our query engine.
pub fn has_data(&self) -> bool {
match &self.state {
ChunkState::Invalid => false,
ChunkState::Open(chunk) | ChunkState::Closing(chunk) => {
chunk.has_table(&self.table_name)
}
ChunkState::Moving(chunk) => chunk.has_table(&self.table_name),
ChunkState::Moved(chunk) => chunk.has_table(&self.table_name),
ChunkState::WritingToObjectStore(chunk) => chunk.has_table(&self.table_name),
ChunkState::WrittenToObjectStore(chunk, _) => chunk.has_table(&self.table_name),
}
}
/// Returns an approximation of the amount of process memory consumed by the
/// chunk
pub fn size(&self) -> usize {

View File

@ -4,7 +4,7 @@ use std::{collections::BTreeMap, sync::Arc};
use super::{
chunk::{Chunk, ChunkState},
Result, UnknownChunk,
Result, UnknownChunk, UnknownTable,
};
use chrono::{DateTime, Utc};
use data_types::chunk::ChunkSummary;
@ -21,11 +21,8 @@ pub struct Partition {
/// The partition key
key: String,
/// What the next chunk id is
next_chunk_id: u32,
/// The chunks that make up this partition, indexed by id
chunks: BTreeMap<u32, Arc<RwLock<Chunk>>>,
/// Tables within this partition
tables: BTreeMap<String, PartitionTable>,
/// When this partition was created
created_at: DateTime<Utc>,
@ -54,8 +51,7 @@ impl Partition {
let now = Utc::now();
Self {
key,
next_chunk_id: 0,
chunks: BTreeMap::new(),
tables: BTreeMap::new(),
created_at: now,
last_write_at: now,
}
@ -77,17 +73,28 @@ impl Partition {
}
/// Create a new Chunk in the open state
pub fn create_open_chunk(&mut self, memory_registry: &MemRegistry) -> Arc<RwLock<Chunk>> {
let chunk_id = self.next_chunk_id;
self.next_chunk_id += 1;
pub fn create_open_chunk(
&mut self,
table_name: impl Into<String>,
memory_registry: &MemRegistry,
) -> Arc<RwLock<Chunk>> {
let table_name: String = table_name.into();
let table = self
.tables
.entry(table_name.clone())
.or_insert_with(PartitionTable::new);
let chunk_id = table.next_chunk_id;
table.next_chunk_id += 1;
let chunk = Arc::new(RwLock::new(Chunk::new_open(
&self.key,
table_name,
chunk_id,
memory_registry,
)));
if self.chunks.insert(chunk_id, Arc::clone(&chunk)).is_some() {
if table.chunks.insert(chunk_id, Arc::clone(&chunk)).is_some() {
// A fundamental invariant has been violated - abort
panic!("chunk already existed with id {}", chunk_id)
}
@ -96,52 +103,82 @@ impl Partition {
}
/// Drop the specified chunk
pub fn drop_chunk(&mut self, chunk_id: u32) -> Result<()> {
match self.chunks.remove(&chunk_id) {
pub fn drop_chunk(&mut self, table_name: impl Into<String>, chunk_id: u32) -> Result<()> {
let table_name = table_name.into();
match self.tables.get_mut(&table_name) {
Some(table) => match table.chunks.remove(&chunk_id) {
Some(_) => Ok(()),
None => UnknownChunk {
partition_key: self.key(),
table_name,
chunk_id,
}
.fail(),
},
None => UnknownTable {
partition_key: self.key(),
table_name,
}
.fail(),
}
}
/// return the first currently open chunk, if any
pub fn open_chunk(&self) -> Option<Arc<RwLock<Chunk>>> {
self.chunks
pub fn open_chunk(&self, table_name: impl Into<String>) -> Result<Option<Arc<RwLock<Chunk>>>> {
let table_name = table_name.into();
match self.tables.get(&table_name) {
Some(table) => Ok(table
.chunks
.values()
.find(|chunk| {
let chunk = chunk.read();
matches!(chunk.state(), ChunkState::Open(_))
})
.cloned()
.cloned()),
None => UnknownTable {
partition_key: self.key(),
table_name,
}
.fail(),
}
}
/// Return an immutable chunk reference by chunk id
pub fn chunk(&self, chunk_id: u32) -> Result<Arc<RwLock<Chunk>>> {
self.chunks.get(&chunk_id).cloned().context(UnknownChunk {
partition_key: self.key(),
chunk_id,
})
}
pub fn chunk(
&self,
table_name: impl Into<String>,
chunk_id: u32,
) -> Result<Arc<RwLock<Chunk>>> {
let table_name = table_name.into();
pub fn chunk_ids(&self) -> impl Iterator<Item = u32> + '_ {
self.chunks.keys().cloned()
match self.tables.get(&table_name) {
Some(table) => table.chunks.get(&chunk_id).cloned().context(UnknownChunk {
partition_key: self.key(),
table_name,
chunk_id,
}),
None => UnknownTable {
partition_key: self.key(),
table_name,
}
.fail(),
}
}
/// Return a iterator over chunks in this partition
pub fn chunks(&self) -> impl Iterator<Item = &Arc<RwLock<Chunk>>> {
self.chunks.values()
self.tables.values().flat_map(|table| table.chunks.values())
}
/// Return a PartitionSummary for this partition
pub fn summary(&self) -> PartitionSummary {
let table_summaries = self
.chunks()
.flat_map(|chunk| {
.filter_map(|chunk| {
let chunk = chunk.read();
chunk.table_summaries()
chunk.table_summary()
})
.collect();
@ -150,6 +187,24 @@ impl Partition {
/// Return chunk summaries for all chunks in this partition
pub fn chunk_summaries(&self) -> impl Iterator<Item = ChunkSummary> + '_ {
self.chunks.values().map(|x| x.read().summary())
self.chunks().map(|x| x.read().summary())
}
}
#[derive(Debug)]
struct PartitionTable {
/// What the next chunk id is
next_chunk_id: u32,
/// The chunks that make up this partition, indexed by id
chunks: BTreeMap<u32, Arc<RwLock<Chunk>>>,
}
impl PartitionTable {
fn new() -> Self {
Self {
next_chunk_id: 0,
chunks: BTreeMap::new(),
}
}
}

View File

@ -71,13 +71,13 @@ trait ChunkMover {
fn is_write_active(&self) -> bool;
/// Starts an operation to move a chunk to the read buffer
fn move_to_read_buffer(&mut self, partition_key: String, chunk_id: u32);
fn move_to_read_buffer(&mut self, partition_key: String, table_name: String, chunk_id: u32);
/// Starts an operation to write a chunk to the object store
fn write_to_object_store(&mut self, partition_key: String, chunk_id: u32);
fn write_to_object_store(&mut self, partition_key: String, table_name: String, chunk_id: u32);
/// Drops a chunk from the database
fn drop_chunk(&mut self, partition_key: String, chunk_id: u32);
fn drop_chunk(&mut self, partition_key: String, table_name: String, chunk_id: u32);
/// The core policy logic
fn check_for_work(&mut self, now: DateTime<Utc>) {
@ -111,30 +111,33 @@ trait ChunkMover {
chunk_guard.set_closing().expect("cannot close open chunk");
let partition_key = chunk_guard.key().to_string();
let table_name = chunk_guard.table_name().to_string();
let chunk_id = chunk_guard.id();
std::mem::drop(chunk_guard);
move_active = true;
self.move_to_read_buffer(partition_key, chunk_id);
self.move_to_read_buffer(partition_key, table_name, chunk_id);
}
ChunkState::Closing(_) if would_move => {
let partition_key = chunk_guard.key().to_string();
let table_name = chunk_guard.table_name().to_string();
let chunk_id = chunk_guard.id();
std::mem::drop(chunk_guard);
move_active = true;
self.move_to_read_buffer(partition_key, chunk_id);
self.move_to_read_buffer(partition_key, table_name, chunk_id);
}
ChunkState::Moved(_) if would_write => {
let partition_key = chunk_guard.key().to_string();
let table_name = chunk_guard.table_name().to_string();
let chunk_id = chunk_guard.id();
std::mem::drop(chunk_guard);
write_active = true;
self.write_to_object_store(partition_key, chunk_id);
self.write_to_object_store(partition_key, table_name, chunk_id);
}
_ => {}
}
@ -154,13 +157,14 @@ trait ChunkMover {
|| matches!(chunk_guard.state(), ChunkState::WrittenToObjectStore(_, _))
{
let partition_key = chunk_guard.key().to_string();
let table_name = chunk_guard.table_name().to_string();
let chunk_id = chunk_guard.id();
buffer_size =
buffer_size.saturating_sub(Self::chunk_size(&*chunk_guard));
std::mem::drop(chunk_guard);
self.drop_chunk(partition_key, chunk_id)
self.drop_chunk(partition_key, table_name, chunk_id)
}
}
None => {
@ -197,27 +201,29 @@ impl ChunkMover for LifecycleManager {
.unwrap_or(false)
}
fn move_to_read_buffer(&mut self, partition_key: String, chunk_id: u32) {
fn move_to_read_buffer(&mut self, partition_key: String, table_name: String, chunk_id: u32) {
info!(%partition_key, %chunk_id, "moving chunk to read buffer");
self.move_task = Some(
self.db
.load_chunk_to_read_buffer_in_background(partition_key, chunk_id),
)
self.move_task = Some(self.db.load_chunk_to_read_buffer_in_background(
partition_key,
table_name,
chunk_id,
))
}
fn write_to_object_store(&mut self, partition_key: String, chunk_id: u32) {
fn write_to_object_store(&mut self, partition_key: String, table_name: String, chunk_id: u32) {
info!(%partition_key, %chunk_id, "write chunk to object store");
self.write_task = Some(
self.db
.write_chunk_to_object_store_in_background(partition_key, chunk_id),
)
self.write_task = Some(self.db.write_chunk_to_object_store_in_background(
partition_key,
table_name,
chunk_id,
))
}
fn drop_chunk(&mut self, partition_key: String, chunk_id: u32) {
fn drop_chunk(&mut self, partition_key: String, table_name: String, chunk_id: u32) {
info!(%partition_key, %chunk_id, "dropping chunk");
let _ = self
.db
.drop_chunk(&partition_key, chunk_id)
.drop_chunk(&partition_key, &table_name, chunk_id)
.log_if_error("dropping chunk to free up memory");
}
@ -279,7 +285,7 @@ mod tests {
time_of_first_write: Option<i64>,
time_of_last_write: Option<i64>,
) -> Chunk {
let mut chunk = Chunk::new_open("", id, &MemRegistry::new());
let mut chunk = Chunk::new_open("", "table1", id, &MemRegistry::new());
chunk.set_timestamps(
time_of_first_write.map(from_secs),
time_of_last_write.map(from_secs),
@ -388,7 +394,12 @@ mod tests {
self.write_active
}
fn move_to_read_buffer(&mut self, _: String, chunk_id: u32) {
fn move_to_read_buffer(
&mut self,
_partition_key: String,
_table_name: String,
chunk_id: u32,
) {
let chunk = self
.chunks
.iter()
@ -398,7 +409,12 @@ mod tests {
self.events.push(MoverEvents::Move(chunk_id))
}
fn write_to_object_store(&mut self, _partition_key: String, chunk_id: u32) {
fn write_to_object_store(
&mut self,
_partition_key: String,
_table_name: String,
chunk_id: u32,
) {
let chunk = self
.chunks
.iter()
@ -408,7 +424,7 @@ mod tests {
self.events.push(MoverEvents::Write(chunk_id))
}
fn drop_chunk(&mut self, _: String, chunk_id: u32) {
fn drop_chunk(&mut self, _partition_key: String, _table_name: String, chunk_id: u32) {
self.chunks = self
.chunks
.drain(..)

View File

@ -81,6 +81,7 @@ fn append_time(
fn from_chunk_summaries(chunks: Vec<ChunkSummary>) -> Result<RecordBatch> {
let mut id = UInt32Builder::new(chunks.len());
let mut partition_key = StringBuilder::new(chunks.len());
let mut table_name = StringBuilder::new(chunks.len());
let mut storage = StringBuilder::new(chunks.len());
let mut estimated_bytes = UInt64Builder::new(chunks.len());
let mut time_of_first_write = TimestampNanosecondBuilder::new(chunks.len());
@ -90,6 +91,7 @@ fn from_chunk_summaries(chunks: Vec<ChunkSummary>) -> Result<RecordBatch> {
for chunk in chunks {
id.append_value(chunk.id)?;
partition_key.append_value(chunk.partition_key.as_ref())?;
table_name.append_value(chunk.table_name.as_ref())?;
storage.append_value(chunk.storage.as_str())?;
estimated_bytes.append_value(chunk.estimated_bytes as u64)?;
@ -100,6 +102,7 @@ fn from_chunk_summaries(chunks: Vec<ChunkSummary>) -> Result<RecordBatch> {
let id = id.finish();
let partition_key = partition_key.finish();
let table_name = table_name.finish();
let storage = storage.finish();
let estimated_bytes = estimated_bytes.finish();
let time_of_first_write = time_of_first_write.finish();
@ -109,6 +112,7 @@ fn from_chunk_summaries(chunks: Vec<ChunkSummary>) -> Result<RecordBatch> {
let schema = Schema::new(vec![
Field::new("id", id.data_type().clone(), false),
Field::new("partition_key", partition_key.data_type().clone(), false),
Field::new("table_name", table_name.data_type().clone(), false),
Field::new("storage", storage.data_type().clone(), false),
Field::new("estimated_bytes", estimated_bytes.data_type().clone(), true),
Field::new(
@ -129,6 +133,7 @@ fn from_chunk_summaries(chunks: Vec<ChunkSummary>) -> Result<RecordBatch> {
vec![
Arc::new(id),
Arc::new(partition_key),
Arc::new(table_name),
Arc::new(storage),
Arc::new(estimated_bytes),
Arc::new(time_of_first_write),
@ -196,7 +201,8 @@ mod tests {
fn test_from_chunk_summaries() {
let chunks = vec![
ChunkSummary {
partition_key: Arc::new("".to_string()),
partition_key: Arc::new("p1".to_string()),
table_name: Arc::new("table1".to_string()),
id: 0,
storage: ChunkStorage::OpenMutableBuffer,
estimated_bytes: 23754,
@ -208,7 +214,8 @@ mod tests {
time_closing: None,
},
ChunkSummary {
partition_key: Arc::new("".to_string()),
partition_key: Arc::new("p1".to_string()),
table_name: Arc::new("table1".to_string()),
id: 0,
storage: ChunkStorage::OpenMutableBuffer,
estimated_bytes: 23454,
@ -222,12 +229,12 @@ mod tests {
];
let expected = vec![
"+----+---------------+-------------------+-----------------+---------------------+---------------------+--------------+",
"| id | partition_key | storage | estimated_bytes | time_of_first_write | time_of_last_write | time_closing |",
"+----+---------------+-------------------+-----------------+---------------------+---------------------+--------------+",
"| 0 | | OpenMutableBuffer | 23754 | 1970-01-01 00:00:10 | | |",
"| 0 | | OpenMutableBuffer | 23454 | | 1970-01-01 00:01:20 | |",
"+----+---------------+-------------------+-----------------+---------------------+---------------------+--------------+",
"+----+---------------+------------+-------------------+-----------------+---------------------+---------------------+--------------+",
"| id | partition_key | table_name | storage | estimated_bytes | time_of_first_write | time_of_last_write | time_closing |",
"+----+---------------+------------+-------------------+-----------------+---------------------+---------------------+--------------+",
"| 0 | p1 | table1 | OpenMutableBuffer | 23754 | 1970-01-01 00:00:10 | | |",
"| 0 | p1 | table1 | OpenMutableBuffer | 23454 | | 1970-01-01 00:01:20 | |",
"+----+---------------+------------+-------------------+-----------------+---------------------+---------------------+--------------+",
];
let batch = from_chunk_summaries(chunks).unwrap();

View File

@ -596,19 +596,21 @@ impl<M: ConnectionManager> Server<M> {
&self,
db_name: DatabaseName<'_>,
partition_key: impl Into<String>,
table_name: impl Into<String>,
chunk_id: u32,
) -> Result<TaskTracker<Job>> {
let db_name = db_name.to_string();
let name = DatabaseName::new(&db_name).context(InvalidDatabaseName)?;
let partition_key = partition_key.into();
let table_name = table_name.into();
let db = self
.config
.db(&name)
.context(DatabaseNotFound { db_name: &db_name })?;
Ok(db.load_chunk_to_read_buffer_in_background(partition_key, chunk_id))
Ok(db.load_chunk_to_read_buffer_in_background(partition_key, table_name, chunk_id))
}
/// Returns a list of all jobs tracked by this server
@ -1191,13 +1193,17 @@ mod tests {
// start the close (note this is not an async)
let partition_key = "";
let table_name = "cpu";
let db_name_string = db_name.to_string();
let tracker = server.close_chunk(db_name, partition_key, 0).unwrap();
let tracker = server
.close_chunk(db_name, partition_key, table_name, 0)
.unwrap();
let metadata = tracker.metadata();
let expected_metadata = Job::CloseChunk {
db_name: db_name_string,
partition_key: partition_key.to_string(),
table_name: table_name.to_string(),
chunk_id: 0,
};
assert_eq!(metadata, &expected_metadata);

View File

@ -172,8 +172,8 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
let db = make_db();
let data = lp_lines.join("\n");
write_lp(&db, &data);
db.rollover_partition("2020-03-01T00").await.unwrap();
db.rollover_partition("2020-03-02T00").await.unwrap();
db.rollover_partition("2020-03-01T00", "h2o").await.unwrap();
db.rollover_partition("2020-03-02T00", "h2o").await.unwrap();
let scenario2 = DbScenario {
scenario_name:
"Data in 4 partitions, two open chunk and two closed chunks of mutable buffer"
@ -184,10 +184,10 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
let db = make_db();
let data = lp_lines.join("\n");
write_lp(&db, &data);
rollover_and_load(&db, "2020-03-01T00").await;
rollover_and_load(&db, "2020-03-02T00").await;
rollover_and_load(&db, "2020-04-01T00").await;
rollover_and_load(&db, "2020-04-02T00").await;
rollover_and_load(&db, "2020-03-01T00", "h2o").await;
rollover_and_load(&db, "2020-03-02T00", "h2o").await;
rollover_and_load(&db, "2020-04-01T00", "h2o").await;
rollover_and_load(&db, "2020-04-02T00", "h2o").await;
let scenario3 = DbScenario {
scenario_name: "Data in 4 partitions, 4 closed chunks in mutable buffer".into(),
db,

View File

@ -30,6 +30,7 @@ pub struct NoData {}
impl DbSetup for NoData {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let table_name = "cpu";
let db = make_db();
let scenario1 = DbScenario {
scenario_name: "New, Empty Database".into(),
@ -52,19 +53,25 @@ impl DbSetup for NoData {
let data = "cpu,region=west user=23.2 100";
write_lp(&db, data);
// move data out of open chunk
assert_eq!(db.rollover_partition(partition_key).await.unwrap().id(), 0);
assert_eq!(
db.rollover_partition(partition_key, table_name)
.await
.unwrap()
.id(),
0
);
assert_eq!(count_mutable_buffer_chunks(&db), 2);
assert_eq!(count_read_buffer_chunks(&db), 0); // only open chunk
db.load_chunk_to_read_buffer(partition_key, 0)
db.load_chunk_to_read_buffer(partition_key, table_name, 0)
.await
.unwrap();
assert_eq!(count_mutable_buffer_chunks(&db), 1);
assert_eq!(count_read_buffer_chunks(&db), 1); // only open chunk
db.drop_chunk(partition_key, 0).unwrap();
db.drop_chunk(partition_key, table_name, 0).unwrap();
assert_eq!(count_mutable_buffer_chunks(&db), 1);
assert_eq!(count_read_buffer_chunks(&db), 0);
@ -85,6 +92,7 @@ pub struct TwoMeasurements {}
impl DbSetup for TwoMeasurements {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let lp_lines = vec![
"cpu,region=west user=23.2 100",
"cpu,region=west user=21.0 150",
@ -101,6 +109,7 @@ pub struct TwoMeasurementsUnsignedType {}
impl DbSetup for TwoMeasurementsUnsignedType {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let lp_lines = vec![
"restaurant,town=andover count=40000u 100",
"restaurant,town=reading count=632u 120",
@ -120,6 +129,7 @@ pub struct MultiChunkSchemaMerge {}
impl DbSetup for MultiChunkSchemaMerge {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let lp_lines1 = vec![
"cpu,region=west user=23.2,system=5.0 100",
"cpu,region=west user=21.0,system=6.0 150",
@ -140,6 +150,7 @@ pub struct TwoMeasurementsManyNulls {}
impl DbSetup for TwoMeasurementsManyNulls {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let lp_lines1 = vec![
"h2o,state=CA,city=LA,county=LA temp=70.4 100",
"h2o,state=MA,city=Boston,county=Suffolk temp=72.4 250",
@ -265,19 +276,27 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
};
let db = make_db();
write_lp(&db, data);
db.rollover_partition(partition_key).await.unwrap();
let table_names = write_lp(&db, data);
for table_name in &table_names {
db.rollover_partition(partition_key, &table_name)
.await
.unwrap();
}
let scenario2 = DbScenario {
scenario_name: "Data in closed chunk of mutable buffer".into(),
db,
};
let db = make_db();
write_lp(&db, data);
db.rollover_partition(partition_key).await.unwrap();
db.load_chunk_to_read_buffer(partition_key, 0)
let table_names = write_lp(&db, data);
for table_name in &table_names {
db.rollover_partition(partition_key, &table_name)
.await
.unwrap();
db.load_chunk_to_read_buffer(partition_key, &table_name, 0)
.await
.unwrap();
}
let scenario3 = DbScenario {
scenario_name: "Data in read buffer".into(),
db,
@ -307,8 +326,12 @@ pub async fn make_two_chunk_scenarios(
// spread across 2 mutable buffer chunks
let db = make_db();
write_lp(&db, data1);
db.rollover_partition(partition_key).await.unwrap();
let table_names = write_lp(&db, data1);
for table_name in &table_names {
db.rollover_partition(partition_key, &table_name)
.await
.unwrap();
}
write_lp(&db, data2);
let scenario2 = DbScenario {
scenario_name: "Data in one open chunk and one closed chunk of mutable buffer".into(),
@ -317,11 +340,15 @@ pub async fn make_two_chunk_scenarios(
// spread across 1 mutable buffer, 1 read buffer chunks
let db = make_db();
write_lp(&db, data1);
db.rollover_partition(partition_key).await.unwrap();
db.load_chunk_to_read_buffer(partition_key, 0)
let table_names = write_lp(&db, data1);
for table_name in &table_names {
db.rollover_partition(partition_key, &table_name)
.await
.unwrap();
db.load_chunk_to_read_buffer(partition_key, &table_name, 0)
.await
.unwrap();
}
write_lp(&db, data2);
let scenario3 = DbScenario {
scenario_name: "Data in open chunk of mutable buffer, and one chunk of read buffer".into(),
@ -330,18 +357,26 @@ pub async fn make_two_chunk_scenarios(
// in 2 read buffer chunks
let db = make_db();
write_lp(&db, data1);
db.rollover_partition(partition_key).await.unwrap();
let table_names = write_lp(&db, data1);
for table_name in &table_names {
db.rollover_partition(partition_key, &table_name)
.await
.unwrap();
}
write_lp(&db, data2);
db.rollover_partition(partition_key).await.unwrap();
db.load_chunk_to_read_buffer(partition_key, 0)
for table_name in &table_names {
db.rollover_partition(partition_key, &table_name)
.await
.unwrap();
db.load_chunk_to_read_buffer(partition_key, 1)
db.load_chunk_to_read_buffer(partition_key, &table_name, 0)
.await
.unwrap();
db.load_chunk_to_read_buffer(partition_key, &table_name, 1)
.await
.unwrap();
}
let scenario4 = DbScenario {
scenario_name: "Data in two read buffer chunks".into(),
db,
@ -351,9 +386,11 @@ pub async fn make_two_chunk_scenarios(
}
/// Rollover the mutable buffer and load chunk 0 to the read bufer
pub async fn rollover_and_load(db: &Db, partition_key: &str) {
db.rollover_partition(partition_key).await.unwrap();
db.load_chunk_to_read_buffer(partition_key, 0)
pub async fn rollover_and_load(db: &Db, partition_key: &str, table_name: &str) {
db.rollover_partition(partition_key, table_name)
.await
.unwrap();
db.load_chunk_to_read_buffer(partition_key, table_name, 0)
.await
.unwrap();
}

View File

@ -223,13 +223,14 @@ async fn sql_select_from_information_schema_columns() {
"| public | iox | o2 | state | 2 | | YES | Utf8 | | 2147483647 | | | | | |",
"| public | iox | o2 | temp | 3 | | YES | Float64 | | | 24 | 2 | | | |",
"| public | iox | o2 | time | 4 | | NO | Timestamp(Nanosecond, None) | | | | | | | |",
"| public | system | chunks | estimated_bytes | 3 | | YES | UInt64 | | | | | | | |",
"| public | system | chunks | estimated_bytes | 4 | | YES | UInt64 | | | | | | | |",
"| public | system | chunks | id | 0 | | NO | UInt32 | | | 32 | 2 | | | |",
"| public | system | chunks | partition_key | 1 | | NO | Utf8 | | 2147483647 | | | | | |",
"| public | system | chunks | storage | 2 | | NO | Utf8 | | 2147483647 | | | | | |",
"| public | system | chunks | time_closing | 6 | | YES | Timestamp(Nanosecond, None) | | | | | | | |",
"| public | system | chunks | time_of_first_write | 4 | | YES | Timestamp(Nanosecond, None) | | | | | | | |",
"| public | system | chunks | time_of_last_write | 5 | | YES | Timestamp(Nanosecond, None) | | | | | | | |",
"| public | system | chunks | storage | 3 | | NO | Utf8 | | 2147483647 | | | | | |",
"| public | system | chunks | table_name | 2 | | NO | Utf8 | | 2147483647 | | | | | |",
"| public | system | chunks | time_closing | 7 | | YES | Timestamp(Nanosecond, None) | | | | | | | |",
"| public | system | chunks | time_of_first_write | 5 | | YES | Timestamp(Nanosecond, None) | | | | | | | |",
"| public | system | chunks | time_of_last_write | 6 | | YES | Timestamp(Nanosecond, None) | | | | | | | |",
"| public | system | columns | column_name | 2 | | YES | Utf8 | | 2147483647 | | | | | |",
"| public | system | columns | count | 3 | | YES | UInt64 | | | | | | | |",
"| public | system | columns | partition_key | 0 | | NO | Utf8 | | 2147483647 | | | | | |",
@ -275,15 +276,16 @@ async fn sql_select_from_system_tables() {
// test timestamps, etc)
let expected = vec![
"+----+---------------+-------------------+-----------------+",
"| id | partition_key | storage | estimated_bytes |",
"+----+---------------+-------------------+-----------------+",
"| 0 | 1970-01-01T00 | OpenMutableBuffer | 501 |",
"+----+---------------+-------------------+-----------------+",
"+----+---------------+------------+-------------------+-----------------+",
"| id | partition_key | table_name | storage | estimated_bytes |",
"+----+---------------+------------+-------------------+-----------------+",
"| 0 | 1970-01-01T00 | h2o | OpenMutableBuffer | 324 |",
"| 0 | 1970-01-01T00 | o2 | OpenMutableBuffer | 264 |",
"+----+---------------+------------+-------------------+-----------------+",
];
run_sql_test_case!(
TwoMeasurementsManyFieldsOneChunk {},
"SELECT id, partition_key, storage, estimated_bytes from system.chunks",
"SELECT id, partition_key, table_name, storage, estimated_bytes from system.chunks",
&expected
);

View File

@ -232,7 +232,7 @@ pub fn snapshot_chunk<T>(
store: Arc<ObjectStore>,
partition_key: &str,
chunk: Arc<T>,
table_stats: Vec<TableSummary>,
table_stats: TableSummary,
notify: Option<oneshot::Sender<()>>,
) -> Result<Arc<Snapshot<T>>>
where
@ -244,7 +244,7 @@ where
data_path,
store,
chunk,
table_stats,
vec![table_stats],
);
let snapshot = Arc::new(snapshot);
@ -288,7 +288,6 @@ mod tests {
cpu,host=A,region=west user=23.2,system=55.1 1
cpu,host=A,region=west user=3.2,system=50.1 10
cpu,host=B,region=east user=10.0,system=74.1 1
mem,host=A,region=west used=45 1
"#;
let db = make_db();
@ -303,7 +302,9 @@ mem,host=A,region=west used=45 1
data_path.push_dir("data");
let chunk = Arc::clone(&db.chunks("1970-01-01T00")[0]);
let table_summaries = db.table_summaries("1970-01-01T00", chunk.id());
let table_summary = db
.table_summary("1970-01-01T00", "cpu", chunk.id())
.unwrap();
let snapshot = snapshot_chunk(
metadata_path.clone(),
@ -311,7 +312,7 @@ mem,host=A,region=west used=45 1
Arc::clone(&store),
"testaroo",
chunk,
table_summaries,
table_summary,
Some(tx),
)
.unwrap();

View File

@ -85,6 +85,9 @@ struct NewChunk {
/// The partition key
partition_key: String,
/// The table name
table_name: String,
}
/// Closes a chunk in the mutable buffer for writing and starts its migration to
@ -97,6 +100,9 @@ struct CloseChunk {
/// The partition key
partition_key: String,
/// The table name
table_name: String,
/// The chunk id
chunk_id: u32,
}
@ -168,21 +174,25 @@ pub async fn command(url: String, config: Config) -> Result<()> {
let NewChunk {
db_name,
partition_key,
table_name,
} = new_chunk;
// Ignore response for now
client.new_partition_chunk(db_name, partition_key).await?;
client
.new_partition_chunk(db_name, partition_key, table_name)
.await?;
println!("Ok");
}
Command::CloseChunk(close_chunk) => {
let CloseChunk {
db_name,
partition_key,
table_name,
chunk_id,
} = close_chunk;
let operation: Operation = client
.close_partition_chunk(db_name, partition_key, chunk_id)
.close_partition_chunk(db_name, partition_key, table_name, chunk_id)
.await?
.try_into()?;

View File

@ -657,6 +657,7 @@ struct SnapshotInfo {
org: String,
bucket: String,
partition: String,
table_name: String,
}
#[tracing::instrument(level = "debug")]
@ -691,8 +692,14 @@ async fn snapshot_partition<M: ConnectionManager + Send + Sync + Debug + 'static
data_path.push_all_dirs(&["data", &snapshot.partition]);
let partition_key = &snapshot.partition;
let chunk = db.rollover_partition(partition_key).await.unwrap();
let table_stats = db.table_summaries(partition_key, chunk.id());
let table_name = &snapshot.table_name;
let chunk = db
.rollover_partition(partition_key, table_name)
.await
.unwrap();
let table_stats = db
.table_summary(partition_key, table_name, chunk.id())
.unwrap();
let snapshot = server::snapshot::snapshot_chunk(
metadata_path,
data_path,

View File

@ -59,12 +59,22 @@ pub fn default_catalog_error_handler(error: server::db::catalog::Error) -> tonic
..Default::default()
}
.into(),
Error::UnknownTable {
partition_key,
table_name,
} => NotFound {
resource_type: "table".to_string(),
resource_name: format!("{}:{}", partition_key, table_name),
..Default::default()
}
.into(),
Error::UnknownChunk {
partition_key,
table_name,
chunk_id,
} => NotFound {
resource_type: "chunk".to_string(),
resource_name: format!("{}:{}", partition_key, chunk_id),
resource_name: format!("{}:{}:{}", partition_key, table_name, chunk_id),
..Default::default()
}
.into(),

View File

@ -329,6 +329,7 @@ where
let NewPartitionChunkRequest {
db_name,
partition_key,
table_name,
} = request.into_inner();
let db_name = DatabaseName::new(db_name).field("db_name")?;
@ -338,7 +339,7 @@ where
..Default::default()
})?;
db.rollover_partition(&partition_key)
db.rollover_partition(&partition_key, &table_name)
.await
.map_err(default_db_error_handler)?;
@ -352,6 +353,7 @@ where
let ClosePartitionChunkRequest {
db_name,
partition_key,
table_name,
chunk_id,
} = request.into_inner();
@ -360,7 +362,7 @@ where
let tracker = self
.server
.close_chunk(db_name, partition_key, chunk_id)
.close_chunk(db_name, partition_key, table_name, chunk_id)
.map_err(default_server_error_handler)?;
let operation = Some(super::operations::encode_tracker(tracker)?);

View File

@ -275,6 +275,7 @@ async fn test_chunk_get() {
let expected: Vec<Chunk> = vec![
Chunk {
partition_key: "cpu".into(),
table_name: "cpu".into(),
id: 0,
storage: ChunkStorage::OpenMutableBuffer as i32,
estimated_bytes: 161,
@ -284,6 +285,7 @@ async fn test_chunk_get() {
},
Chunk {
partition_key: "disk".into(),
table_name: "disk".into(),
id: 0,
storage: ChunkStorage::OpenMutableBuffer as i32,
estimated_bytes: 127,
@ -450,6 +452,7 @@ async fn test_list_partition_chunks() {
let expected: Vec<Chunk> = vec![Chunk {
partition_key: "cpu".into(),
table_name: "cpu".into(),
id: 0,
storage: ChunkStorage::OpenMutableBuffer as i32,
estimated_bytes: 161,
@ -505,10 +508,11 @@ async fn test_new_partition_chunk() {
assert_eq!(chunks.len(), 1, "Chunks: {:#?}", chunks);
let partition_key = "cpu";
let table_name = "cpu";
// Rollover the a second chunk
management_client
.new_partition_chunk(&db_name, partition_key)
.new_partition_chunk(&db_name, partition_key, table_name)
.await
.expect("new partition chunk");
@ -538,7 +542,7 @@ async fn test_new_partition_chunk() {
// Rollover a (currently non existent) partition which is not OK
let err = management_client
.new_partition_chunk(&db_name, "non_existent_partition")
.new_partition_chunk(&db_name, "non_existent_partition", table_name)
.await
.expect_err("new partition chunk");
@ -546,6 +550,17 @@ async fn test_new_partition_chunk() {
"Resource partition/non_existent_partition not found",
err.to_string()
);
// Rollover a (currently non existent) table in an existing partition which is not OK
let err = management_client
.new_partition_chunk(&db_name, partition_key, "non_existing_table")
.await
.expect_err("new partition chunk");
assert_eq!(
"Resource table/cpu:non_existing_table not found",
err.to_string()
);
}
#[tokio::test]
@ -554,7 +569,11 @@ async fn test_new_partition_chunk_error() {
let mut management_client = fixture.management_client();
let err = management_client
.new_partition_chunk("this database does not exist", "nor_does_this_partition")
.new_partition_chunk(
"this database does not exist",
"nor_does_this_partition",
"nor_does_this_table",
)
.await
.expect_err("expected error");
@ -578,6 +597,7 @@ async fn test_close_partition_chunk() {
create_readable_database(&db_name, fixture.grpc_channel()).await;
let partition_key = "cpu";
let table_name = "cpu";
let lp_lines = vec!["cpu,region=west user=23.2 100"];
write_client
@ -596,7 +616,7 @@ async fn test_close_partition_chunk() {
// Move the chunk to read buffer
let operation = management_client
.close_partition_chunk(&db_name, partition_key, 0)
.close_partition_chunk(&db_name, partition_key, table_name, 0)
.await
.expect("new partition chunk");
@ -640,7 +660,12 @@ async fn test_close_partition_chunk_error() {
let mut management_client = fixture.management_client();
let err = management_client
.close_partition_chunk("this database does not exist", "nor_does_this_partition", 0)
.close_partition_chunk(
"this database does not exist",
"nor_does_this_partition",
"nor_does_this_table",
0,
)
.await
.expect_err("expected error");
@ -701,6 +726,7 @@ fn normalize_chunks(chunks: Vec<Chunk>) -> Vec<Chunk> {
.map(|summary| {
let Chunk {
partition_key,
table_name,
id,
storage,
estimated_bytes,
@ -708,6 +734,7 @@ fn normalize_chunks(chunks: Vec<Chunk>) -> Vec<Chunk> {
} = summary;
Chunk {
partition_key,
table_name,
id,
storage,
estimated_bytes,

View File

@ -406,6 +406,7 @@ async fn test_list_partition_chunks() {
let expected = r#"
"partition_key": "cpu",
"table_name": "cpu",
"id": 0,
"storage": "OpenMutableBuffer",
"#;
@ -472,6 +473,7 @@ async fn test_new_partition_chunk() {
.arg("new-chunk")
.arg(&db_name)
.arg("cpu")
.arg("cpu")
.arg("--host")
.arg(addr)
.assert()
@ -504,6 +506,7 @@ async fn test_new_partition_chunk_error() {
.arg("new-chunk")
.arg("non_existent_database")
.arg("non_existent_partition")
.arg("non_existent_table")
.arg("--host")
.arg(addr)
.assert()
@ -532,6 +535,7 @@ async fn test_close_partition_chunk() {
.arg("close-chunk")
.arg(&db_name)
.arg("cpu")
.arg("cpu")
.arg("0")
.arg("--host")
.arg(addr)
@ -545,6 +549,7 @@ async fn test_close_partition_chunk() {
let expected_job = Job::CloseChunk {
db_name,
partition_key: "cpu".into(),
table_name: "cpu".into(),
chunk_id: 0,
};
@ -568,6 +573,7 @@ async fn test_close_partition_chunk_error() {
.arg("close-chunk")
.arg("non_existent_database")
.arg("non_existent_partition")
.arg("non_existent_table")
.arg("0")
.arg("--host")
.arg(addr)