diff --git a/mutable_buffer/src/chunk/snapshot.rs b/mutable_buffer/src/chunk/snapshot.rs index 85ae359430..82a2f96a97 100644 --- a/mutable_buffer/src/chunk/snapshot.rs +++ b/mutable_buffer/src/chunk/snapshot.rs @@ -6,7 +6,7 @@ use arrow::{ datatypes::{DataType, Int32Type}, record_batch::RecordBatch, }; -use snafu::{ensure, ResultExt, Snafu}; +use snafu::{ResultExt, Snafu}; use data_types::partition_metadata::TableSummary; use data_types::timestamp::TimestampRange; @@ -73,13 +73,7 @@ impl ChunkSnapshot { } /// Return Schema for the specified table / columns - pub fn table_schema(&self, table_name: &str, selection: Selection<'_>) -> Result { - // Temporary #1295 - ensure!( - self.table_name.as_ref() == table_name, - TableNotFound { table_name } - ); - + pub fn table_schema(&self, selection: Selection<'_>) -> Result { Ok(match selection { Selection::All => self.schema.as_ref().clone(), Selection::Some(columns) => { @@ -104,13 +98,7 @@ impl ChunkSnapshot { } /// Returns a RecordBatch with the given selection - pub fn read_filter(&self, table_name: &str, selection: Selection<'_>) -> Result { - // Temporary #1295 - ensure!( - self.table_name.as_ref() == table_name, - TableNotFound { table_name } - ); - + pub fn read_filter(&self, selection: Selection<'_>) -> Result { Ok(match selection { Selection::All => self.batch.clone(), Selection::Some(columns) => { @@ -127,16 +115,7 @@ impl ChunkSnapshot { } /// Returns a given selection of column names from a table - pub fn column_names( - &self, - table_name: &str, - selection: Selection<'_>, - ) -> Option> { - // Temporary #1295 - if self.table_name.as_ref() != table_name { - return None; - } - + pub fn column_names(&self, selection: Selection<'_>) -> Option> { let fields = self.schema.inner().fields().iter(); Some(match selection { diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index f738c738b4..2d31bc203d 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -111,11 +111,6 @@ impl Chunk { self.table.name() } - /// Return true if this chunk includes the given table - pub fn has_table(&self, table_name: &str) -> bool { - self.table_name() == table_name - } - /// Return the approximate memory size of the chunk, in bytes including the /// dictionary, tables, and their rows. pub fn size(&self) -> usize { @@ -148,13 +143,14 @@ impl Chunk { /// Return stream of data read from parquet file of the given table pub fn read_filter( &self, - table_name: &str, predicate: &Predicate, selection: Selection<'_>, ) -> Result { self.table .read_filter(predicate, selection) - .context(ReadParquet { table_name }) + .context(ReadParquet { + table_name: self.table_name(), + }) } /// The total number of rows in all row groups in all tables in this chunk. diff --git a/server/src/db.rs b/server/src/db.rs index ea882bb232..21b7c725a9 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -602,7 +602,7 @@ impl Db { // load table into the new chunk one by one. debug!(%partition_key, %table_name, %chunk_id, table=%table_summary.name, "loading table to read buffer"); let batch = mb_chunk - .read_filter(table_name, Selection::All) + .read_filter(Selection::All) // It is probably reasonable to recover from this error // (reset the chunk state to Open) but until that is // implemented (and tested) just panic diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 0ea896185b..22c0252d43 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -300,9 +300,7 @@ impl PartitionChunk for DbChunk { match &self.state { State::MutableBuffer { chunk, .. } => { - let batch = chunk - .read_filter(table_name, selection) - .context(MutableBufferChunk)?; + let batch = chunk.read_filter(selection).context(MutableBufferChunk)?; Ok(Box::pin(MemoryStream::new(vec![batch]))) } @@ -333,11 +331,13 @@ impl PartitionChunk for DbChunk { schema.into(), ))) } - State::ParquetFile { chunk, .. } => chunk - .read_filter(table_name, predicate, selection) - .context(ParquetFileChunkError { - chunk_id: self.id(), - }), + State::ParquetFile { chunk, .. } => { + chunk + .read_filter(predicate, selection) + .context(ParquetFileChunkError { + chunk_id: self.id(), + }) + } } } @@ -353,7 +353,7 @@ impl PartitionChunk for DbChunk { // TODO: Support predicates return Ok(None); } - Ok(chunk.column_names(table_name, columns)) + Ok(chunk.column_names(columns)) } State::ReadBuffer { chunk, .. } => { let rb_predicate = match to_read_buffer_predicate(&predicate) { diff --git a/server/src/lib.rs b/server/src/lib.rs index 1ad36ee919..4e7d8c0ce5 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -75,6 +75,7 @@ use bytes::BytesMut; use cached::proc_macro::cached; use db::load_or_create_preserved_catalog; use futures::stream::TryStreamExt; +use object_store::path::Path; use observability_deps::tracing::{debug, error, info, warn}; use parking_lot::Mutex; use snafu::{OptionExt, ResultExt, Snafu}; @@ -124,60 +125,87 @@ type DatabaseError = Box; pub enum Error { #[snafu(display("Server error: {}", source))] ServerError { source: std::io::Error }, + #[snafu(display("database not found: {}", db_name))] DatabaseNotFound { db_name: String }, + #[snafu(display("invalid database: {}", source))] InvalidDatabaseName { source: DatabaseNameError }, + #[snafu(display("database error: {}", source))] UnknownDatabaseError { source: DatabaseError }, + #[snafu(display("getting mutable buffer chunk: {}", source))] MutableBufferChunk { source: DatabaseError }, + #[snafu(display("no local buffer for database: {}", db))] NoLocalBuffer { db: String }, + #[snafu(display("unable to get connection to remote server: {}", server))] UnableToGetConnection { server: String, source: DatabaseError, }, + #[snafu(display("error replicating to remote: {}", source))] ErrorReplicating { source: DatabaseError }, + #[snafu(display("id already set"))] IdAlreadySet { id: ServerId }, + #[snafu(display("unable to use server until id is set"))] IdNotSet, - #[snafu(display("error serializing configuration {}", source))] - ErrorSerializing { + + #[snafu(display("error serializing database rules to protobuf: {}", source))] + ErrorSerializingRulesProtobuf { source: generated_types::database_rules::EncodeError, }, + + #[snafu(display("error deserializing database rules from protobuf: {}", source))] + ErrorDeserializingRulesProtobuf { + source: generated_types::database_rules::DecodeError, + }, + #[snafu(display("error deserializing configuration {}", source))] ErrorDeserializing { source: serde_json::Error }, + #[snafu(display("store error: {}", source))] StoreError { source: object_store::Error }, + #[snafu(display( "no database configuration present in directory that contains data: {:?}", location ))] NoDatabaseConfigError { location: object_store::path::Path }, + #[snafu(display("database already exists"))] DatabaseAlreadyExists { db_name: String }, + #[snafu(display("error converting line protocol to flatbuffers: {}", source))] LineConversion { source: entry::Error }, + #[snafu(display("error decoding entry flatbuffers: {}", source))] DecodingEntry { source: flatbuffers::InvalidFlatbuffer, }, + #[snafu(display("shard not found: {}", shard_id))] ShardNotFound { shard_id: ShardId }, + #[snafu(display("hard buffer limit reached"))] HardLimitReached {}, + #[snafu(display("no remote configured for node group: {:?}", node_group))] NoRemoteConfigured { node_group: NodeGroup }, + #[snafu(display("all remotes failed connecting: {:?}", errors))] NoRemoteReachable { errors: HashMap, }, + #[snafu(display("remote error: {}", source))] RemoteError { source: ConnectionManagerError }, + #[snafu(display("cannot load catalog: {}", source))] CatalogLoadError { source: DatabaseError }, } @@ -463,7 +491,7 @@ impl Server { let location = object_store_path_for_database_config(&self.root_path()?, &rules.name); let mut data = BytesMut::new(); - encode_database_rules(rules, &mut data).context(ErrorSerializing)?; + encode_database_rules(rules, &mut data).context(ErrorSerializingRulesProtobuf)?; let len = data.len(); @@ -514,49 +542,11 @@ impl Server { path.set_file_name(DB_RULES_FILE_NAME); tokio::task::spawn(async move { - let mut res = get_database_config_bytes(&path, &store).await; - while let Err(e) = &res { - if let Error::NoDatabaseConfigError { location } = e { - warn!(?location, "{}", e); - return; - } - error!( - "error getting database config {:?} from object store: {}", - path, e - ); - tokio::time::sleep(tokio::time::Duration::from_secs( - STORE_ERROR_PAUSE_SECONDS, - )) - .await; - res = get_database_config_bytes(&path, &store).await; + if let Err(e) = + Self::load_database_config(server_id, store, config, exec, path).await + { + error!(%e, "cannot load database"); } - - let res = res.unwrap(); // it's not an error, otherwise we'd be still looping above - let res = res.freeze(); - - match decode_database_rules(res) { - Err(e) => { - error!("error parsing database config {:?} from store: {}", path, e) - } - Ok(rules) => { - match load_or_create_preserved_catalog( - rules.db_name(), - Arc::clone(&store), - server_id, - config.metrics_registry(), - ) - .await - { - Err(e) => error!("cannot load database: {}", e), - Ok(preserved_catalog) => match config.create_db(rules) { - Err(e) => error!("error adding database to config: {}", e), - Ok(handle) => { - handle.commit(server_id, store, exec, preserved_catalog) - } - }, - } - } - }; }) }) .collect(); @@ -566,6 +556,49 @@ impl Server { Ok(()) } + async fn load_database_config( + server_id: ServerId, + store: Arc, + config: Arc, + exec: Arc, + path: Path, + ) -> Result<()> { + let serialized_rules = loop { + match get_database_config_bytes(&path, &store).await { + Ok(data) => break data, + Err(e) => { + if let Error::NoDatabaseConfigError { location } = &e { + warn!(?location, "{}", e); + return Ok(()); + } + error!( + "error getting database config {:?} from object store: {}", + path, e + ); + tokio::time::sleep(tokio::time::Duration::from_secs(STORE_ERROR_PAUSE_SECONDS)) + .await; + } + } + }; + let rules = decode_database_rules(serialized_rules.freeze()) + .context(ErrorDeserializingRulesProtobuf)?; + + let preserved_catalog = load_or_create_preserved_catalog( + rules.db_name(), + Arc::clone(&store), + server_id, + config.metrics_registry(), + ) + .await + .map_err(|e| Box::new(e) as _) + .context(CatalogLoadError)?; + + let handle = config.create_db(rules)?; + handle.commit(server_id, store, exec, preserved_catalog); + + Ok(()) + } + /// `write_lines` takes in raw line protocol and converts it to a collection /// of ShardedEntry which are then sent to other IOx servers based on /// the ShardConfig or sent to the local database for buffering in the