Merge branch 'main' into fixnewline

pull/24376/head
kodiakhq[bot] 2021-06-28 11:13:41 +00:00 committed by GitHub
commit b69711d5ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 270 additions and 230 deletions

View File

@ -4,3 +4,10 @@ root = true
end_of_line = lf
insert_final_newline = true
trim_trailing_whitespace = true
indent_style = space
[{Dockerfile*,*.proto}]
indent_size = 2
[{*.rs,*.toml}]
indent_size = 4

View File

@ -579,7 +579,7 @@ mod test {
use object_store::{memory::InMemory, ObjectStore, ObjectStoreApi};
use crate::db::load_or_create_preserved_catalog;
use crate::db::load::load_or_create_preserved_catalog;
use super::*;
use std::num::NonZeroU32;

View File

@ -36,14 +36,14 @@ use datafusion::{
};
use entry::{Entry, SequencedEntry};
use internal_types::{arrow::sort::sort_record_batch, selection::Selection};
use metrics::{KeyValue, MetricRegistry};
use metrics::KeyValue;
use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
use mutable_buffer::persistence_windows::PersistenceWindows;
use object_store::{path::parsed::DirsAndFileName, ObjectStore};
use observability_deps::tracing::{debug, error, info, warn};
use parquet_file::catalog::CheckpointData;
use parquet_file::{
catalog::{CatalogParquetInfo, CatalogState, ChunkCreationFailed, PreservedCatalog},
catalog::PreservedCatalog,
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
cleanup::cleanup_unreferenced_parquet_files,
metadata::IoxMetadata,
@ -66,6 +66,7 @@ pub mod access;
pub mod catalog;
mod chunk;
mod lifecycle;
pub mod load;
pub mod pred;
mod process_clock;
mod streams;
@ -282,93 +283,6 @@ pub struct Db {
write_buffer: Option<Arc<dyn WriteBuffer>>,
}
/// Load preserved catalog state from store.
///
/// If no catalog exists yet, a new one will be created.
///
/// **For now, if the catalog is broken, it will be wiped! (https://github.com/influxdata/influxdb_iox/issues/1522)**
pub async fn load_or_create_preserved_catalog(
db_name: &str,
object_store: Arc<ObjectStore>,
server_id: ServerId,
metrics_registry: Arc<MetricRegistry>,
wipe_on_error: bool,
) -> std::result::Result<(PreservedCatalog, Catalog), parquet_file::catalog::Error> {
let metric_labels = vec![
KeyValue::new("db_name", db_name.to_string()),
KeyValue::new("svr_id", format!("{}", server_id)),
];
// first try to load existing catalogs
let metrics_domain =
metrics_registry.register_domain_with_labels("catalog", metric_labels.clone());
match PreservedCatalog::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
CatalogEmptyInput {
domain: metrics_domain,
metrics_registry: Arc::clone(&metrics_registry),
metric_labels: metric_labels.clone(),
},
)
.await
{
Ok(Some(catalog)) => {
// successfull load
info!("Found existing catalog for DB {}", db_name);
Ok(catalog)
}
Ok(None) => {
// no catalog yet => create one
info!(
"Found NO existing catalog for DB {}, creating new one",
db_name
);
let metrics_domain =
metrics_registry.register_domain_with_labels("catalog", metric_labels.clone());
PreservedCatalog::new_empty(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
CatalogEmptyInput {
domain: metrics_domain,
metrics_registry: Arc::clone(&metrics_registry),
metric_labels: metric_labels.clone(),
},
)
.await
}
Err(e) => {
if wipe_on_error {
// https://github.com/influxdata/influxdb_iox/issues/1522)
// broken => wipe for now (at least during early iterations)
error!("cannot load catalog, so wipe it: {}", e);
PreservedCatalog::wipe(&object_store, server_id, db_name).await?;
let metrics_domain =
metrics_registry.register_domain_with_labels("catalog", metric_labels.clone());
PreservedCatalog::new_empty(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
CatalogEmptyInput {
domain: metrics_domain,
metrics_registry: Arc::clone(&metrics_registry),
metric_labels: metric_labels.clone(),
},
)
.await
} else {
Err(e)
}
}
}
}
/// All the information needed to commit a database
#[derive(Debug)]
pub(crate) struct DatabaseToCommit {
@ -1097,104 +1011,7 @@ impl CatalogProvider for Db {
}
}
/// All input required to create an empty [`Catalog`](crate::db::catalog::Catalog).
#[derive(Debug)]
pub struct CatalogEmptyInput {
domain: ::metrics::Domain,
metrics_registry: Arc<::metrics::MetricRegistry>,
metric_labels: Vec<KeyValue>,
}
impl CatalogState for Catalog {
type EmptyInput = CatalogEmptyInput;
fn new_empty(db_name: &str, data: Self::EmptyInput) -> Self {
Self::new(
Arc::from(db_name),
data.domain,
data.metrics_registry,
data.metric_labels,
)
}
fn add(
&mut self,
object_store: Arc<ObjectStore>,
info: CatalogParquetInfo,
) -> parquet_file::catalog::Result<()> {
use parquet_file::catalog::MetadataExtractFailed;
// extract relevant bits from parquet file metadata
let iox_md = info
.metadata
.read_iox_metadata()
.context(MetadataExtractFailed {
path: info.path.clone(),
})?;
// Create a parquet chunk for this chunk
let metrics = self
.metrics_registry
.register_domain_with_labels("parquet", self.metric_labels.clone());
let metrics = ParquetChunkMetrics::new(&metrics, self.metrics().memory().parquet());
let parquet_chunk = ParquetChunk::new(
object_store.path_from_dirs_and_filename(info.path.clone()),
object_store,
info.metadata,
metrics,
)
.context(ChunkCreationFailed {
path: info.path.clone(),
})?;
let parquet_chunk = Arc::new(parquet_chunk);
// Get partition from the catalog
// Note that the partition might not exist yet if the chunk is loaded from an existing preserved catalog.
let partition = self.get_or_create_partition(&iox_md.table_name, &iox_md.partition_key);
let mut partition = partition.write();
if partition.chunk(iox_md.chunk_id).is_some() {
return Err(parquet_file::catalog::Error::ParquetFileAlreadyExists { path: info.path });
}
partition.insert_object_store_only_chunk(iox_md.chunk_id, parquet_chunk);
Ok(())
}
fn remove(&mut self, path: DirsAndFileName) -> parquet_file::catalog::Result<()> {
let mut removed_any = false;
for partition in self.partitions() {
let mut partition = partition.write();
let mut to_remove = vec![];
for chunk in partition.chunks() {
let chunk = chunk.read();
if let ChunkStage::Persisted { parquet, .. } = chunk.stage() {
let chunk_path: DirsAndFileName = parquet.path().into();
if path == chunk_path {
to_remove.push(chunk.id());
}
}
}
for chunk_id in to_remove {
if let Err(e) = partition.drop_chunk(chunk_id) {
panic!("Chunk is gone while we've had a partition lock: {}", e);
}
removed_any = true;
}
}
if removed_any {
Ok(())
} else {
Err(parquet_file::catalog::Error::ParquetFileDoesNotExist { path })
}
}
}
fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData {
pub(crate) fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData {
let mut files = HashMap::new();
for chunk in catalog.chunks() {
@ -1273,7 +1090,7 @@ mod tests {
ObjectStore, ObjectStoreApi,
};
use parquet_file::{
catalog::test_helpers::{assert_catalog_state_implementation, TestCatalogState},
catalog::test_helpers::TestCatalogState,
metadata::IoxParquetMetaData,
test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data},
};
@ -2780,29 +2597,6 @@ mod tests {
.unwrap();
}
#[tokio::test]
async fn load_or_create_preserved_catalog_recovers_from_error() {
let object_store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let server_id = ServerId::try_from(1).unwrap();
let db_name = "preserved_catalog_test";
let (preserved_catalog, _catalog) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
parquet_file::catalog::test_helpers::break_catalog_with_weird_version(&preserved_catalog)
.await;
let metrics_registry = Arc::new(metrics::MetricRegistry::new());
load_or_create_preserved_catalog(db_name, object_store, server_id, metrics_registry, true)
.await
.unwrap();
}
#[tokio::test]
async fn write_one_chunk_to_preserved_catalog() {
// Test that parquet data is committed to preserved catalog
@ -3067,21 +2861,6 @@ mod tests {
write_lp(db.as_ref(), "cpu bar=1 10").await;
}
#[tokio::test]
async fn test_catalog_state() {
let metrics_registry = Arc::new(::metrics::MetricRegistry::new());
let empty_input = CatalogEmptyInput {
domain: metrics_registry.register_domain("catalog"),
metrics_registry,
metric_labels: vec![],
};
assert_catalog_state_implementation::<Catalog, _>(
empty_input,
checkpoint_data_from_catalog,
)
.await;
}
async fn create_parquet_chunk(db: &Db) -> (String, String, u32) {
write_lp(db, "cpu bar=1 10").await;
let partition_key = "1970-01-01T00";

254
server/src/db/load.rs Normal file
View File

@ -0,0 +1,254 @@
//! Functionality to load a [`Catalog`](crate::db::catalog::Catalog) and other information from a
//! [`PreservedCatalog`](parquet_file::catalog::PreservedCatalog).
use std::sync::Arc;
use data_types::server_id::ServerId;
use metrics::{KeyValue, MetricRegistry};
use object_store::{path::parsed::DirsAndFileName, ObjectStore};
use observability_deps::tracing::{error, info};
use parquet_file::{
catalog::{CatalogParquetInfo, CatalogState, ChunkCreationFailed, PreservedCatalog},
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
};
use snafu::ResultExt;
use crate::db::catalog::chunk::ChunkStage;
use super::catalog::Catalog;
/// Load preserved catalog state from store.
///
/// If no catalog exists yet, a new one will be created.
///
/// **For now, if the catalog is broken, it will be wiped! (https://github.com/influxdata/influxdb_iox/issues/1522)**
pub async fn load_or_create_preserved_catalog(
db_name: &str,
object_store: Arc<ObjectStore>,
server_id: ServerId,
metrics_registry: Arc<MetricRegistry>,
wipe_on_error: bool,
) -> std::result::Result<(PreservedCatalog, Catalog), parquet_file::catalog::Error> {
let metric_labels = vec![
KeyValue::new("db_name", db_name.to_string()),
KeyValue::new("svr_id", format!("{}", server_id)),
];
// first try to load existing catalogs
let metrics_domain =
metrics_registry.register_domain_with_labels("catalog", metric_labels.clone());
match PreservedCatalog::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
CatalogEmptyInput {
domain: metrics_domain,
metrics_registry: Arc::clone(&metrics_registry),
metric_labels: metric_labels.clone(),
},
)
.await
{
Ok(Some(catalog)) => {
// successfull load
info!("Found existing catalog for DB {}", db_name);
Ok(catalog)
}
Ok(None) => {
// no catalog yet => create one
info!(
"Found NO existing catalog for DB {}, creating new one",
db_name
);
let metrics_domain =
metrics_registry.register_domain_with_labels("catalog", metric_labels.clone());
PreservedCatalog::new_empty(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
CatalogEmptyInput {
domain: metrics_domain,
metrics_registry: Arc::clone(&metrics_registry),
metric_labels: metric_labels.clone(),
},
)
.await
}
Err(e) => {
if wipe_on_error {
// https://github.com/influxdata/influxdb_iox/issues/1522)
// broken => wipe for now (at least during early iterations)
error!("cannot load catalog, so wipe it: {}", e);
PreservedCatalog::wipe(&object_store, server_id, db_name).await?;
let metrics_domain =
metrics_registry.register_domain_with_labels("catalog", metric_labels.clone());
PreservedCatalog::new_empty(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
CatalogEmptyInput {
domain: metrics_domain,
metrics_registry: Arc::clone(&metrics_registry),
metric_labels: metric_labels.clone(),
},
)
.await
} else {
Err(e)
}
}
}
}
/// All input required to create an empty [`Catalog`](crate::db::catalog::Catalog).
#[derive(Debug)]
pub struct CatalogEmptyInput {
domain: ::metrics::Domain,
metrics_registry: Arc<::metrics::MetricRegistry>,
metric_labels: Vec<KeyValue>,
}
impl CatalogState for Catalog {
type EmptyInput = CatalogEmptyInput;
fn new_empty(db_name: &str, data: Self::EmptyInput) -> Self {
Self::new(
Arc::from(db_name),
data.domain,
data.metrics_registry,
data.metric_labels,
)
}
fn add(
&mut self,
object_store: Arc<ObjectStore>,
info: CatalogParquetInfo,
) -> parquet_file::catalog::Result<()> {
use parquet_file::catalog::MetadataExtractFailed;
// extract relevant bits from parquet file metadata
let iox_md = info
.metadata
.read_iox_metadata()
.context(MetadataExtractFailed {
path: info.path.clone(),
})?;
// Create a parquet chunk for this chunk
let metrics = self
.metrics_registry
.register_domain_with_labels("parquet", self.metric_labels.clone());
let metrics = ParquetChunkMetrics::new(&metrics, self.metrics().memory().parquet());
let parquet_chunk = ParquetChunk::new(
object_store.path_from_dirs_and_filename(info.path.clone()),
object_store,
info.metadata,
metrics,
)
.context(ChunkCreationFailed {
path: info.path.clone(),
})?;
let parquet_chunk = Arc::new(parquet_chunk);
// Get partition from the catalog
// Note that the partition might not exist yet if the chunk is loaded from an existing preserved catalog.
let partition = self.get_or_create_partition(&iox_md.table_name, &iox_md.partition_key);
let mut partition = partition.write();
if partition.chunk(iox_md.chunk_id).is_some() {
return Err(parquet_file::catalog::Error::ParquetFileAlreadyExists { path: info.path });
}
partition.insert_object_store_only_chunk(iox_md.chunk_id, parquet_chunk);
Ok(())
}
fn remove(&mut self, path: DirsAndFileName) -> parquet_file::catalog::Result<()> {
let mut removed_any = false;
for partition in self.partitions() {
let mut partition = partition.write();
let mut to_remove = vec![];
for chunk in partition.chunks() {
let chunk = chunk.read();
if let ChunkStage::Persisted { parquet, .. } = chunk.stage() {
let chunk_path: DirsAndFileName = parquet.path().into();
if path == chunk_path {
to_remove.push(chunk.id());
}
}
}
for chunk_id in to_remove {
if let Err(e) = partition.drop_chunk(chunk_id) {
panic!("Chunk is gone while we've had a partition lock: {}", e);
}
removed_any = true;
}
}
if removed_any {
Ok(())
} else {
Err(parquet_file::catalog::Error::ParquetFileDoesNotExist { path })
}
}
}
#[cfg(test)]
mod tests {
use std::convert::TryFrom;
use object_store::memory::InMemory;
use parquet_file::catalog::test_helpers::{
assert_catalog_state_implementation, TestCatalogState,
};
use crate::db::checkpoint_data_from_catalog;
use super::*;
#[tokio::test]
async fn load_or_create_preserved_catalog_recovers_from_error() {
let object_store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let server_id = ServerId::try_from(1).unwrap();
let db_name = "preserved_catalog_test";
let (preserved_catalog, _catalog) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
parquet_file::catalog::test_helpers::break_catalog_with_weird_version(&preserved_catalog)
.await;
let metrics_registry = Arc::new(metrics::MetricRegistry::new());
load_or_create_preserved_catalog(db_name, object_store, server_id, metrics_registry, true)
.await
.unwrap();
}
#[tokio::test]
async fn test_catalog_state() {
let metrics_registry = Arc::new(::metrics::MetricRegistry::new());
let empty_input = CatalogEmptyInput {
domain: metrics_registry.register_domain("catalog"),
metrics_registry,
metric_labels: vec![],
};
assert_catalog_state_implementation::<Catalog, _>(
empty_input,
checkpoint_data_from_catalog,
)
.await;
}
}

View File

@ -23,7 +23,7 @@ use tokio::sync::Semaphore;
use crate::{
config::{Config, DB_RULES_FILE_NAME},
db::{load_or_create_preserved_catalog, DatabaseToCommit},
db::{load::load_or_create_preserved_catalog, DatabaseToCommit},
write_buffer, DatabaseError,
};

View File

@ -73,7 +73,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::BytesMut;
use db::{load_or_create_preserved_catalog, DatabaseToCommit};
use db::{load::load_or_create_preserved_catalog, DatabaseToCommit};
use init::InitStatus;
use observability_deps::tracing::{debug, info, warn};
use parking_lot::Mutex;

View File

@ -10,7 +10,7 @@ use object_store::{memory::InMemory, ObjectStore};
use query::{exec::Executor, QueryDatabase};
use crate::{
db::{load_or_create_preserved_catalog, DatabaseToCommit, Db},
db::{load::load_or_create_preserved_catalog, DatabaseToCommit, Db},
write_buffer::WriteBuffer,
JobRegistry,
};