Merge branch 'main' into er/refactor/encodings

pull/24376/head
Edd Robinson 2021-05-27 21:18:30 +01:00 committed by GitHub
commit 840e40cbf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 75 additions and 71 deletions

View File

@ -102,7 +102,7 @@ impl Chunk {
}
/// Returns the summary statistics for this chunk
pub fn table_summary(&self) -> TableSummary {
pub fn table_summary(&self) -> &Arc<TableSummary> {
self.table.table_summary()
}

View File

@ -518,8 +518,8 @@ mod tests {
let table_summary_actual =
read_statistics_from_parquet_metadata(&parquet_metadata, &schema_actual, &table)
.unwrap();
let table_summary_expected = chunk.table_summary();
assert_eq!(table_summary_actual, table_summary_expected);
let table_summary_expected = chunk.table_summary().as_ref();
assert_eq!(&table_summary_actual, table_summary_expected);
}
#[tokio::test]
@ -541,8 +541,8 @@ mod tests {
let table_summary_actual =
read_statistics_from_parquet_metadata(&parquet_metadata, &schema_actual, &table)
.unwrap();
let table_summary_expected = chunk.table_summary();
assert_eq!(table_summary_actual, table_summary_expected);
let table_summary_expected = chunk.table_summary().as_ref();
assert_eq!(&table_summary_actual, table_summary_expected);
}
#[tokio::test]

View File

@ -31,7 +31,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Clone)]
pub struct Table {
/// Meta data of the table
table_summary: TableSummary,
table_summary: Arc<TableSummary>,
/// Path in the object store. Format:
/// <writer id>/<database>/data/<partition key>/<chunk
@ -59,7 +59,7 @@ impl Table {
let timestamp_range = extract_range(&table_summary);
Self {
table_summary,
table_summary: Arc::new(table_summary),
object_store_path: path,
object_store: store,
table_schema: schema,
@ -67,8 +67,8 @@ impl Table {
}
}
pub fn table_summary(&self) -> TableSummary {
self.table_summary.clone()
pub fn table_summary(&self) -> &Arc<TableSummary> {
&self.table_summary
}
pub fn has_table(&self, table_name: &str) -> bool {

View File

@ -577,20 +577,19 @@ impl Db {
// update the catalog to say we are processing this chunk and
// then drop the lock while we do the work
let mb_chunk = {
let (mb_chunk, table_summary) = {
let mut chunk = chunk.write();
chunk.set_moving(tracker).context(LoadingChunk {
let mb_chunk = chunk.set_moving(tracker).context(LoadingChunk {
partition_key,
table_name,
chunk_id,
})?
})?;
(mb_chunk, chunk.table_summary())
};
info!(%partition_key, %table_name, %chunk_id, "chunk marked MOVING, loading tables into read buffer");
let table_summary = mb_chunk.table_summary();
// create a new read buffer chunk with memory tracking
let metrics = self
.metrics_registry
@ -662,23 +661,23 @@ impl Db {
// update the catalog to say we are processing this chunk and
// then drop the lock while we do the work
let rb_chunk = {
let (rb_chunk, table_summary) = {
let mut chunk = chunk.write();
chunk
.set_writing_to_object_store(tracker)
.context(LoadingChunkToParquet {
partition_key,
table_name,
chunk_id,
})?
let rb_chunk =
chunk
.set_writing_to_object_store(tracker)
.context(LoadingChunkToParquet {
partition_key,
table_name,
chunk_id,
})?;
(rb_chunk, chunk.table_summary())
};
debug!(%partition_key, %table_name, %chunk_id, "chunk marked WRITING , loading tables into object store");
// Get all tables in this chunk
let table_stats = rb_chunk.table_summaries();
// Create a storage to save data of this chunk
let storage = Storage::new(
Arc::clone(&self.store),
@ -686,24 +685,21 @@ impl Db {
self.rules.read().name.to_string(),
);
// as of now, there should be exactly 1 table in the chunk (see https://github.com/influxdata/influxdb_iox/issues/1295)
assert_eq!(table_stats.len(), 1);
let stats = &table_stats[0];
debug!(%partition_key, %table_name, %chunk_id, table=%stats.name, "loading table to object store");
let table_name = table_summary.name.as_str();
debug!(%partition_key, %table_name, %chunk_id, table=table_name, "loading table to object store");
let predicate = read_buffer::Predicate::default();
// Get RecordBatchStream of data from the read buffer chunk
let read_results = rb_chunk
.read_filter(stats.name.as_str(), predicate, Selection::All)
.read_filter(table_name, predicate, Selection::All)
.context(ReadBufferChunkError {
table_name,
chunk_id,
})?;
let arrow_schema: ArrowSchemaRef = rb_chunk
.read_filter_table_schema(stats.name.as_str(), Selection::All)
.read_filter_table_schema(table_name, Selection::All)
.context(ReadBufferChunkSchemaError {
table_name,
chunk_id,
@ -727,7 +723,7 @@ impl Db {
.write_to_object_store(
partition_key.to_string(),
chunk_id,
stats.name.to_string(),
table_name.to_string(),
stream,
metadata,
)
@ -909,7 +905,7 @@ impl Db {
partition_key: &str,
table_name: &str,
chunk_id: u32,
) -> Option<TableSummary> {
) -> Option<Arc<TableSummary>> {
if let Some(partition) = self.catalog.state().partition(partition_key) {
let partition = partition.read();
if let Ok(chunk) = partition.chunk(table_name, chunk_id) {
@ -1482,7 +1478,7 @@ mod tests {
.eq(1.0)
.unwrap();
let expected_parquet_size = 807;
let expected_parquet_size = 727;
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1598).unwrap();
// now also in OS
catalog_chunk_size_bytes_metric_eq(
@ -1838,7 +1834,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(2405.0)
.sample_sum_eq(2325.0)
.unwrap();
// it should be the same chunk!
@ -1853,15 +1849,14 @@ mod tests {
// Verify data written to the parquet file in object store
//
// First, there must be one path of object store in the catalog
let paths = pq_chunk.object_store_paths();
assert_eq!(paths.len(), 1);
let path = pq_chunk.object_store_path().unwrap();
// Check that the path must exist in the object store
let path_list = flatten_list_stream(Arc::clone(&object_store), Some(&paths[0]))
let path_list = flatten_list_stream(Arc::clone(&object_store), Some(&path))
.await
.unwrap();
assert_eq!(path_list.len(), 1);
assert_eq!(path_list, paths.clone());
assert_eq!(path_list[0], path);
// Now read data from that path
let parquet_data = load_parquet_from_store_for_path(&path_list[0], object_store)
@ -1947,7 +1942,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(2405.0)
.sample_sum_eq(2325.0)
.unwrap();
// Unload RB chunk but keep it in OS
@ -1975,22 +1970,21 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(807.0)
.sample_sum_eq(727.0)
.unwrap();
// Verify data written to the parquet file in object store
//
// First, there must be one path of object store in the catalog
let paths = pq_chunk.object_store_paths();
assert_eq!(paths.len(), 1);
let path = pq_chunk.object_store_path().unwrap();
// Check that the path must exist in the object store
let path_list = flatten_list_stream(Arc::clone(&object_store), Some(&paths[0]))
let path_list = flatten_list_stream(Arc::clone(&object_store), Some(&path))
.await
.unwrap();
println!("path_list: {:#?}", path_list);
assert_eq!(path_list.len(), 1);
assert_eq!(path_list, paths.clone());
assert_eq!(path_list[0], path);
// Now read data from that path
let parquet_data = load_parquet_from_store_for_path(&path_list[0], object_store)
@ -2336,7 +2330,7 @@ mod tests {
Arc::from("cpu"),
0,
ChunkStorage::ReadBufferAndObjectStore,
2396, // size of RB and OS chunks
2316, // size of RB and OS chunks
1,
),
ChunkSummary::new_without_timestamps(
@ -2391,7 +2385,7 @@ mod tests {
);
assert_eq!(
db.catalog.state().metrics().memory().parquet().get_total(),
807
727
);
}

View File

@ -101,6 +101,10 @@ pub struct Chunk {
/// The metrics for this chunk
metrics: ChunkMetrics,
/// The TableSummary, including statistics, for the table in this
/// Chunk, if known
table_summary: Option<Arc<TableSummary>>,
/// Time at which the first data was written into this chunk. Note
/// this is not the same as the timestamps on the data itself
time_of_first_write: Option<DateTime<Utc>>,
@ -170,7 +174,7 @@ impl Chunk {
);
let table_name = Arc::clone(&chunk.table_name());
let table_summary = None;
let state = ChunkState::Open(chunk);
metrics
.state
@ -183,6 +187,7 @@ impl Chunk {
state,
lifecycle_action: None,
metrics,
table_summary,
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
@ -208,6 +213,8 @@ impl Chunk {
.expect("chunk must have exactly 1 table")
.as_ref(),
);
// Cache table summary
let table_summary = Some(Arc::clone(chunk.table_summary()));
let state = ChunkState::ObjectStoreOnly(chunk);
Self {
@ -217,6 +224,7 @@ impl Chunk {
state,
lifecycle_action: None,
metrics,
table_summary,
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
@ -343,22 +351,20 @@ impl Chunk {
}
/// Return the summary information about the table stored in this Chunk
pub fn table_summary(&self) -> TableSummary {
pub fn table_summary(&self) -> Arc<TableSummary> {
match &self.state {
ChunkState::Invalid => panic!("invalid chunk state"),
ChunkState::Open(chunk) => chunk.table_summary(),
ChunkState::Closed(chunk) => chunk.table_summary(),
ChunkState::Moved(chunk) => {
let mut summaries = chunk.table_summaries();
assert_eq!(summaries.len(), 1);
summaries.remove(0)
ChunkState::Invalid => panic!("Chunk in invalid state"),
ChunkState::Open(chunk) => {
// The stats for open chunks change so can't be cached
Arc::new(chunk.table_summary())
}
ChunkState::WrittenToObjectStore(chunk, _) => {
let mut summaries = chunk.table_summaries();
assert_eq!(summaries.len(), 1);
summaries.remove(0)
_ => {
let table_summary = self
.table_summary
.as_ref()
.expect("Table summary not set for non open chunk");
Arc::clone(table_summary)
}
ChunkState::ObjectStoreOnly(chunk) => chunk.table_summary(),
}
}
@ -407,6 +413,8 @@ impl Chunk {
.immutable_chunk_size
.observe_with_labels(chunk.size() as f64, &[KeyValue::new("state", "closed")]);
// Cache table summary
self.table_summary = Some(Arc::new(chunk.table_summary()));
Ok(s)
}
state => {

View File

@ -256,7 +256,8 @@ impl Partition {
let chunk = chunk.read();
UnaggregatedTableSummary {
chunk_id: chunk.id(),
table: chunk.table_summary(),
// Note: makes a deep copy of the TableSummary
table: chunk.table_summary().as_ref().clone(),
}
})
.collect();

View File

@ -146,11 +146,12 @@ impl DbChunk {
})
}
/// Return object store paths
pub fn object_store_paths(&self) -> Vec<Path> {
/// Return the Path in ObjectStorage where this chunk is
/// persisted, if any
pub fn object_store_path(&self) -> Option<Path> {
match &self.state {
State::ParquetFile { chunk } => vec![chunk.table_path()],
_ => vec![],
State::ParquetFile { chunk } => Some(chunk.table_path()),
_ => None,
}
}
}

View File

@ -321,10 +321,10 @@ async fn sql_select_from_system_chunk_columns() {
"+---------------+----------+------------+--------------+-------------------+-------+-----------+-----------+-----------------+",
"| partition_key | chunk_id | table_name | column_name | storage | count | min_value | max_value | estimated_bytes |",
"+---------------+----------+------------+--------------+-------------------+-------+-----------+-----------+-----------------+",
"| 1970-01-01T00 | 0 | h2o | city | ReadBuffer | 2 | Boston | time | 585 |",
"| 1970-01-01T00 | 0 | h2o | other_temp | ReadBuffer | 2 | 70.4 | 70.4 | 369 |",
"| 1970-01-01T00 | 0 | h2o | state | ReadBuffer | 2 | Boston | time | 585 |",
"| 1970-01-01T00 | 0 | h2o | temp | ReadBuffer | 2 | 70.4 | 70.4 | 369 |",
"| 1970-01-01T00 | 0 | h2o | city | ReadBuffer | 2 | Boston | Boston | 585 |",
"| 1970-01-01T00 | 0 | h2o | other_temp | ReadBuffer | 1 | 70.4 | 70.4 | 369 |",
"| 1970-01-01T00 | 0 | h2o | state | ReadBuffer | 2 | MA | MA | 585 |",
"| 1970-01-01T00 | 0 | h2o | temp | ReadBuffer | 1 | 70.4 | 70.4 | 369 |",
"| 1970-01-01T00 | 0 | h2o | time | ReadBuffer | 2 | 50 | 250 | 51 |",
"| 1970-01-01T00 | 0 | o2 | __dictionary | OpenMutableBuffer | | | | 112 |",
"| 1970-01-01T00 | 0 | o2 | city | OpenMutableBuffer | 1 | Boston | Boston | 17 |",