Merge branch 'main' into ntran/deduplicate

pull/24376/head
kodiakhq[bot] 2021-06-02 17:42:36 +00:00 committed by GitHub
commit 1c764c47a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 95 additions and 87 deletions

View File

@ -6,7 +6,7 @@ use arrow::{
datatypes::{DataType, Int32Type},
record_batch::RecordBatch,
};
use snafu::{ensure, ResultExt, Snafu};
use snafu::{ResultExt, Snafu};
use data_types::partition_metadata::TableSummary;
use data_types::timestamp::TimestampRange;
@ -73,13 +73,7 @@ impl ChunkSnapshot {
}
/// Return Schema for the specified table / columns
pub fn table_schema(&self, table_name: &str, selection: Selection<'_>) -> Result<Schema> {
// Temporary #1295
ensure!(
self.table_name.as_ref() == table_name,
TableNotFound { table_name }
);
pub fn table_schema(&self, selection: Selection<'_>) -> Result<Schema> {
Ok(match selection {
Selection::All => self.schema.as_ref().clone(),
Selection::Some(columns) => {
@ -104,13 +98,7 @@ impl ChunkSnapshot {
}
/// Returns a RecordBatch with the given selection
pub fn read_filter(&self, table_name: &str, selection: Selection<'_>) -> Result<RecordBatch> {
// Temporary #1295
ensure!(
self.table_name.as_ref() == table_name,
TableNotFound { table_name }
);
pub fn read_filter(&self, selection: Selection<'_>) -> Result<RecordBatch> {
Ok(match selection {
Selection::All => self.batch.clone(),
Selection::Some(columns) => {
@ -127,16 +115,7 @@ impl ChunkSnapshot {
}
/// Returns a given selection of column names from a table
pub fn column_names(
&self,
table_name: &str,
selection: Selection<'_>,
) -> Option<BTreeSet<String>> {
// Temporary #1295
if self.table_name.as_ref() != table_name {
return None;
}
pub fn column_names(&self, selection: Selection<'_>) -> Option<BTreeSet<String>> {
let fields = self.schema.inner().fields().iter();
Some(match selection {

View File

@ -111,11 +111,6 @@ impl Chunk {
self.table.name()
}
/// Return true if this chunk includes the given table
pub fn has_table(&self, table_name: &str) -> bool {
self.table_name() == table_name
}
/// Return the approximate memory size of the chunk, in bytes including the
/// dictionary, tables, and their rows.
pub fn size(&self) -> usize {
@ -148,13 +143,14 @@ impl Chunk {
/// Return stream of data read from parquet file of the given table
pub fn read_filter(
&self,
table_name: &str,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream> {
self.table
.read_filter(predicate, selection)
.context(ReadParquet { table_name })
.context(ReadParquet {
table_name: self.table_name(),
})
}
/// The total number of rows in all row groups in all tables in this chunk.

View File

@ -602,7 +602,7 @@ impl Db {
// load table into the new chunk one by one.
debug!(%partition_key, %table_name, %chunk_id, table=%table_summary.name, "loading table to read buffer");
let batch = mb_chunk
.read_filter(table_name, Selection::All)
.read_filter(Selection::All)
// It is probably reasonable to recover from this error
// (reset the chunk state to Open) but until that is
// implemented (and tested) just panic

View File

@ -300,9 +300,7 @@ impl PartitionChunk for DbChunk {
match &self.state {
State::MutableBuffer { chunk, .. } => {
let batch = chunk
.read_filter(table_name, selection)
.context(MutableBufferChunk)?;
let batch = chunk.read_filter(selection).context(MutableBufferChunk)?;
Ok(Box::pin(MemoryStream::new(vec![batch])))
}
@ -333,11 +331,13 @@ impl PartitionChunk for DbChunk {
schema.into(),
)))
}
State::ParquetFile { chunk, .. } => chunk
.read_filter(table_name, predicate, selection)
.context(ParquetFileChunkError {
chunk_id: self.id(),
}),
State::ParquetFile { chunk, .. } => {
chunk
.read_filter(predicate, selection)
.context(ParquetFileChunkError {
chunk_id: self.id(),
})
}
}
}
@ -353,7 +353,7 @@ impl PartitionChunk for DbChunk {
// TODO: Support predicates
return Ok(None);
}
Ok(chunk.column_names(table_name, columns))
Ok(chunk.column_names(columns))
}
State::ReadBuffer { chunk, .. } => {
let rb_predicate = match to_read_buffer_predicate(&predicate) {

View File

@ -75,6 +75,7 @@ use bytes::BytesMut;
use cached::proc_macro::cached;
use db::load_or_create_preserved_catalog;
use futures::stream::TryStreamExt;
use object_store::path::Path;
use observability_deps::tracing::{debug, error, info, warn};
use parking_lot::Mutex;
use snafu::{OptionExt, ResultExt, Snafu};
@ -124,60 +125,87 @@ type DatabaseError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub enum Error {
#[snafu(display("Server error: {}", source))]
ServerError { source: std::io::Error },
#[snafu(display("database not found: {}", db_name))]
DatabaseNotFound { db_name: String },
#[snafu(display("invalid database: {}", source))]
InvalidDatabaseName { source: DatabaseNameError },
#[snafu(display("database error: {}", source))]
UnknownDatabaseError { source: DatabaseError },
#[snafu(display("getting mutable buffer chunk: {}", source))]
MutableBufferChunk { source: DatabaseError },
#[snafu(display("no local buffer for database: {}", db))]
NoLocalBuffer { db: String },
#[snafu(display("unable to get connection to remote server: {}", server))]
UnableToGetConnection {
server: String,
source: DatabaseError,
},
#[snafu(display("error replicating to remote: {}", source))]
ErrorReplicating { source: DatabaseError },
#[snafu(display("id already set"))]
IdAlreadySet { id: ServerId },
#[snafu(display("unable to use server until id is set"))]
IdNotSet,
#[snafu(display("error serializing configuration {}", source))]
ErrorSerializing {
#[snafu(display("error serializing database rules to protobuf: {}", source))]
ErrorSerializingRulesProtobuf {
source: generated_types::database_rules::EncodeError,
},
#[snafu(display("error deserializing database rules from protobuf: {}", source))]
ErrorDeserializingRulesProtobuf {
source: generated_types::database_rules::DecodeError,
},
#[snafu(display("error deserializing configuration {}", source))]
ErrorDeserializing { source: serde_json::Error },
#[snafu(display("store error: {}", source))]
StoreError { source: object_store::Error },
#[snafu(display(
"no database configuration present in directory that contains data: {:?}",
location
))]
NoDatabaseConfigError { location: object_store::path::Path },
#[snafu(display("database already exists"))]
DatabaseAlreadyExists { db_name: String },
#[snafu(display("error converting line protocol to flatbuffers: {}", source))]
LineConversion { source: entry::Error },
#[snafu(display("error decoding entry flatbuffers: {}", source))]
DecodingEntry {
source: flatbuffers::InvalidFlatbuffer,
},
#[snafu(display("shard not found: {}", shard_id))]
ShardNotFound { shard_id: ShardId },
#[snafu(display("hard buffer limit reached"))]
HardLimitReached {},
#[snafu(display("no remote configured for node group: {:?}", node_group))]
NoRemoteConfigured { node_group: NodeGroup },
#[snafu(display("all remotes failed connecting: {:?}", errors))]
NoRemoteReachable {
errors: HashMap<GRpcConnectionString, ConnectionManagerError>,
},
#[snafu(display("remote error: {}", source))]
RemoteError { source: ConnectionManagerError },
#[snafu(display("cannot load catalog: {}", source))]
CatalogLoadError { source: DatabaseError },
}
@ -463,7 +491,7 @@ impl<M: ConnectionManager> Server<M> {
let location = object_store_path_for_database_config(&self.root_path()?, &rules.name);
let mut data = BytesMut::new();
encode_database_rules(rules, &mut data).context(ErrorSerializing)?;
encode_database_rules(rules, &mut data).context(ErrorSerializingRulesProtobuf)?;
let len = data.len();
@ -514,49 +542,11 @@ impl<M: ConnectionManager> Server<M> {
path.set_file_name(DB_RULES_FILE_NAME);
tokio::task::spawn(async move {
let mut res = get_database_config_bytes(&path, &store).await;
while let Err(e) = &res {
if let Error::NoDatabaseConfigError { location } = e {
warn!(?location, "{}", e);
return;
}
error!(
"error getting database config {:?} from object store: {}",
path, e
);
tokio::time::sleep(tokio::time::Duration::from_secs(
STORE_ERROR_PAUSE_SECONDS,
))
.await;
res = get_database_config_bytes(&path, &store).await;
if let Err(e) =
Self::load_database_config(server_id, store, config, exec, path).await
{
error!(%e, "cannot load database");
}
let res = res.unwrap(); // it's not an error, otherwise we'd be still looping above
let res = res.freeze();
match decode_database_rules(res) {
Err(e) => {
error!("error parsing database config {:?} from store: {}", path, e)
}
Ok(rules) => {
match load_or_create_preserved_catalog(
rules.db_name(),
Arc::clone(&store),
server_id,
config.metrics_registry(),
)
.await
{
Err(e) => error!("cannot load database: {}", e),
Ok(preserved_catalog) => match config.create_db(rules) {
Err(e) => error!("error adding database to config: {}", e),
Ok(handle) => {
handle.commit(server_id, store, exec, preserved_catalog)
}
},
}
}
};
})
})
.collect();
@ -566,6 +556,49 @@ impl<M: ConnectionManager> Server<M> {
Ok(())
}
async fn load_database_config(
server_id: ServerId,
store: Arc<ObjectStore>,
config: Arc<Config>,
exec: Arc<Executor>,
path: Path,
) -> Result<()> {
let serialized_rules = loop {
match get_database_config_bytes(&path, &store).await {
Ok(data) => break data,
Err(e) => {
if let Error::NoDatabaseConfigError { location } = &e {
warn!(?location, "{}", e);
return Ok(());
}
error!(
"error getting database config {:?} from object store: {}",
path, e
);
tokio::time::sleep(tokio::time::Duration::from_secs(STORE_ERROR_PAUSE_SECONDS))
.await;
}
}
};
let rules = decode_database_rules(serialized_rules.freeze())
.context(ErrorDeserializingRulesProtobuf)?;
let preserved_catalog = load_or_create_preserved_catalog(
rules.db_name(),
Arc::clone(&store),
server_id,
config.metrics_registry(),
)
.await
.map_err(|e| Box::new(e) as _)
.context(CatalogLoadError)?;
let handle = config.create_db(rules)?;
handle.commit(server_id, store, exec, preserved_catalog);
Ok(())
}
/// `write_lines` takes in raw line protocol and converts it to a collection
/// of ShardedEntry which are then sent to other IOx servers based on
/// the ShardConfig or sent to the local database for buffering in the