Merge pull request #301 from influxdata/cn/extract-restore-wal-function

pull/24376/head
Carol (Nichols || Goulding) 2020-09-18 10:14:08 -04:00 committed by GitHub
commit b774e3b08c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 231 additions and 111 deletions

View File

@ -5,7 +5,7 @@ use delorean_generated_types::wal as wb;
use delorean_line_parser::{FieldValue, ParsedLine}; use delorean_line_parser::{FieldValue, ParsedLine};
use delorean_storage_interface::{Database, DatabaseStore}; use delorean_storage_interface::{Database, DatabaseStore};
use delorean_wal::WalBuilder; use delorean_wal::{Entry as WalEntry, Result as WalResult, WalBuilder};
use delorean_wal_writer::{start_wal_sync_task, Error as WalWriterError, WalDetails}; use delorean_wal_writer::{start_wal_sync_task, Error as WalWriterError, WalDetails};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
@ -74,12 +74,11 @@ pub enum Error {
source: delorean_wal::Error, source: delorean_wal::Error,
}, },
#[snafu(display( #[snafu(display("Error recovering WAL for database {}: {}", database, source))]
"Error recovering WAL for database {} on partition {}", WalRecoverError {
database, database: String,
partition_id source: RestorationError,
))] },
WalRecoverError { database: String, partition_id: u32 },
#[snafu(display( #[snafu(display(
"Error recovering WAL for partition {} on table {}", "Error recovering WAL for partition {} on table {}",
@ -97,8 +96,17 @@ pub enum Error {
err: std::io::Error, err: std::io::Error,
}, },
#[snafu(display("Schema mismatch: Write with the following errors: {}", error))] #[snafu(display(
SchemaMismatch { error: String }, "Schema mismatch: for column {}: can't insert {} into column with type {}",
column_id,
inserted_value_type,
existing_column_type
))]
SchemaMismatch {
column_id: usize,
existing_column_type: String,
inserted_value_type: String,
},
#[snafu(display("Database {} doesn't exist", database))] #[snafu(display("Database {} doesn't exist", database))]
DatabaseNotFound { database: String }, DatabaseNotFound { database: String },
@ -106,8 +114,11 @@ pub enum Error {
#[snafu(display("Partition {} is full", partition_id))] #[snafu(display("Partition {} is full", partition_id))]
PartitionFull { partition_id: String }, PartitionFull { partition_id: String },
#[snafu(display("Table {} not found", table))] #[snafu(display("Table {} not found in dictionary", table))]
TableNotFound { table: String }, TableNotFoundInDictionary { table: String },
#[snafu(display("Table {} not found in partition {}", table, partition))]
TableNotFoundInPartition { table: u32, partition: u32 },
#[snafu(display("Internal Error: Column {} not found", column_id))] #[snafu(display("Internal Error: Column {} not found", column_id))]
InternaColumnNotFound { column_id: u32 }, InternaColumnNotFound { column_id: u32 },
@ -330,82 +341,24 @@ impl Db {
.await .await
.context(OpeningWal { database: &name })?; .context(OpeningWal { database: &name })?;
let mut row_count = 0;
let mut dict_values = 0;
let mut tables = HashSet::new();
// TODO: check wal metadata format // TODO: check wal metadata format
let entries = wal_builder let entries = wal_builder
.entries() .entries()
.context(LoadingWal { database: &name })?; .context(LoadingWal { database: &name })?;
let mut partitions = HashMap::new();
let mut next_partition_id = 0;
for entry in entries {
let entry = entry.context(LoadingWal { database: &name })?;
let bytes = entry.as_data();
let entry = flatbuffers::get_root::<wb::WriteBufferBatch<'_>>(&bytes); let (partitions, next_partition_id, stats) =
restore_partitions_from_wal(entries).context(WalRecoverError { database: &name })?;
if let Some(entries) = entry.entries() {
for entry in entries {
if let Some(po) = entry.partition_open() {
let id = po.id();
if id > next_partition_id {
next_partition_id = id;
}
let p = Partition::new(id, po.name().unwrap().to_string());
partitions.insert(id, p);
} else if let Some(_ps) = entry.partition_snapshot_started() {
todo!("handle partition snapshot");
} else if let Some(_pf) = entry.partition_snapshot_finished() {
todo!("handle partition snapshot finished")
} else if let Some(da) = entry.dictionary_add() {
let p =
partitions
.get_mut(&da.partition_id())
.context(WalRecoverError {
database: &name,
partition_id: da.partition_id(),
})?;
dict_values += 1;
p.intern_new_dict_entry(da.value().unwrap());
} else if let Some(sa) = entry.schema_append() {
let tid = sa.table_id();
tables.insert(tid);
let p =
partitions
.get_mut(&sa.partition_id())
.context(WalRecoverError {
database: &name,
partition_id: sa.partition_id(),
})?;
p.append_wal_schema(sa.table_id(), sa.column_id(), sa.column_type());
} else if let Some(row) = entry.write() {
let p =
partitions
.get_mut(&row.partition_id())
.context(WalRecoverError {
database: &name,
partition_id: row.partition_id(),
})?;
row_count += 1;
p.add_wal_row(row.table_id(), &row.values().unwrap())?;
}
}
}
}
let elapsed = now.elapsed(); let elapsed = now.elapsed();
info!( info!(
"{} database loaded {} rows in {:?} with {} dictionary adds in {} tables", "{} database loaded {} rows in {:?} with {} dictionary adds in {} tables",
&name, &name,
row_count, stats.row_count,
elapsed, elapsed,
dict_values, stats.dict_values,
tables.len() stats.tables.len(),
); );
let partitions: Vec<_> = partitions.into_iter().map(|(_, p)| p).collect();
info!( info!(
"{} database partition count: {}, next_id {}", "{} database partition count: {}, next_id {}",
&name, &name,
@ -423,6 +376,108 @@ impl Db {
} }
} }
#[derive(Debug, Snafu)]
pub enum RestorationError {
#[snafu(display("Could not read WAL entry: {}", source))]
WalEntryRead { source: delorean_wal::Error },
#[snafu(display("Partition {} not found", partition))]
PartitionNotFound { partition: u32 },
#[snafu(display("Table {} not found", table))]
TableNotFound { table: u32 },
#[snafu(display("Column {} not found", column))]
ColumnNotFound { column: u32 },
#[snafu(display(
"Column {} said it was type {} but extracting a value of that type failed",
column,
expected
))]
WalValueTypeMismatch { column: u32, expected: String },
#[snafu(display(
"Column type mismatch for column {}: can't insert {} into column with type {}",
column,
inserted_value_type,
existing_column_type
))]
ColumnTypeMismatch {
column: usize,
existing_column_type: String,
inserted_value_type: String,
},
}
/// Given a set of WAL entries, restore them into a set of Partitions.
pub fn restore_partitions_from_wal(
wal_entries: impl Iterator<Item = WalResult<WalEntry>>,
) -> Result<(Vec<Partition>, u32, RestorationStats), RestorationError> {
let mut stats = RestorationStats::default();
let mut partitions = HashMap::new();
let mut next_partition_id = 0;
for wal_entry in wal_entries {
let wal_entry = wal_entry.context(WalEntryRead)?;
let bytes = wal_entry.as_data();
let batch = flatbuffers::get_root::<wb::WriteBufferBatch<'_>>(&bytes);
if let Some(entries) = batch.entries() {
for entry in entries {
if let Some(po) = entry.partition_open() {
let id = po.id();
if id > next_partition_id {
next_partition_id = id;
}
let p = Partition::new(id, po.name().unwrap().to_string());
partitions.insert(id, p);
} else if let Some(_ps) = entry.partition_snapshot_started() {
todo!("handle partition snapshot");
} else if let Some(_pf) = entry.partition_snapshot_finished() {
todo!("handle partition snapshot finished")
} else if let Some(da) = entry.dictionary_add() {
let p = partitions
.get_mut(&da.partition_id())
.context(PartitionNotFound {
partition: da.partition_id(),
})?;
stats.dict_values += 1;
p.intern_new_dict_entry(da.value().unwrap());
} else if let Some(sa) = entry.schema_append() {
let tid = sa.table_id();
stats.tables.insert(tid);
let p = partitions
.get_mut(&sa.partition_id())
.context(PartitionNotFound {
partition: sa.partition_id(),
})?;
p.append_wal_schema(sa.table_id(), sa.column_id(), sa.column_type());
} else if let Some(row) = entry.write() {
let p = partitions
.get_mut(&row.partition_id())
.context(PartitionNotFound {
partition: row.partition_id(),
})?;
stats.row_count += 1;
p.add_wal_row(row.table_id(), &row.values().unwrap())?;
}
}
}
}
let partitions: Vec<_> = partitions.into_iter().map(|(_, p)| p).collect();
Ok((partitions, next_partition_id, stats))
}
#[derive(Default, Debug)]
pub struct RestorationStats {
row_count: usize,
dict_values: usize,
tables: HashSet<u32>,
}
#[async_trait] #[async_trait]
impl Database for Db { impl Database for Db {
type Error = Error; type Error = Error;
@ -597,7 +652,7 @@ struct ArrowTable {
} }
#[derive(Debug)] #[derive(Debug)]
struct Partition { pub struct Partition {
name: String, name: String,
id: u32, id: u32,
/// `dictionary` maps &str -> u32. The u32s are used in place of /// `dictionary` maps &str -> u32. The u32s are used in place of
@ -649,11 +704,11 @@ impl Partition {
&mut self, &mut self,
table_id: u32, table_id: u32,
values: &flatbuffers::Vector<'_, flatbuffers::ForwardsUOffset<wb::Value<'_>>>, values: &flatbuffers::Vector<'_, flatbuffers::ForwardsUOffset<wb::Value<'_>>>,
) -> Result<()> { ) -> Result<(), RestorationError> {
let t = self.tables.get_mut(&table_id).context(WalPartitionError { let t = self
partition_id: self.id, .tables
table_id, .get_mut(&table_id)
})?; .context(TableNotFound { table: table_id })?;
t.add_wal_row(values) t.add_wal_row(values)
} }
@ -737,14 +792,21 @@ impl Partition {
} }
fn table_to_arrow(&self, table_name: &str) -> Result<RecordBatch> { fn table_to_arrow(&self, table_name: &str) -> Result<RecordBatch> {
let table_id = self.dictionary.get(table_name).context(TableNotFound { let table_id = self
table: table_name.to_string(), .dictionary
})?; .get(table_name)
.context(TableNotFoundInDictionary {
table: table_name.to_string(),
})?;
let table_id = u32::try_from(table_id.to_usize()).context(IdConversionError {})?; let table_id = u32::try_from(table_id.to_usize()).context(IdConversionError {})?;
let table = self.tables.get(&table_id).context(TableNotFound { let table = self
table: format!("id: {}", table_id), .tables
})?; .get(&table_id)
.context(TableNotFoundInPartition {
table: table_id,
partition: self.id,
})?;
table.to_arrow(|id| self.lookup_id(id)) table.to_arrow(|id| self.lookup_id(id))
} }
} }
@ -762,10 +824,36 @@ enum Value<'a> {
FieldValue(&'a delorean_line_parser::FieldValue<'a>), FieldValue(&'a delorean_line_parser::FieldValue<'a>),
} }
impl<'a> Value<'a> {
fn type_description(&self) -> &'static str {
match self {
Value::TagValueId(_) => "tag",
Value::FieldValue(FieldValue::I64(_)) => "i64",
Value::FieldValue(FieldValue::F64(_)) => "f64",
Value::FieldValue(FieldValue::Boolean(_)) => "bool",
Value::FieldValue(FieldValue::String(_)) => "String",
}
}
}
fn symbol_to_u32(sym: DefaultSymbol) -> u32 { fn symbol_to_u32(sym: DefaultSymbol) -> u32 {
sym.to_usize() as u32 sym.to_usize() as u32
} }
fn type_description(value: wb::ColumnValue) -> &'static str {
use wb::ColumnValue::*;
match value {
NONE => "none",
TagValue => "tag",
I64Value => "i64",
U64Value => "u64",
F64Value => "f64",
BoolValue => "bool",
StringValue => "String",
}
}
#[derive(Debug)] #[derive(Debug)]
struct Table { struct Table {
id: u32, id: u32,
@ -825,50 +913,59 @@ impl Table {
fn add_wal_row( fn add_wal_row(
&mut self, &mut self,
values: &flatbuffers::Vector<'_, flatbuffers::ForwardsUOffset<wb::Value<'_>>>, values: &flatbuffers::Vector<'_, flatbuffers::ForwardsUOffset<wb::Value<'_>>>,
) -> Result<()> { ) -> Result<(), RestorationError> {
let row_count = self.row_count(); let row_count = self.row_count();
for value in values { for value in values {
let col = self let col = self
.columns .columns
.get_mut(value.column_index() as usize) .get_mut(value.column_index() as usize)
.context(WalColumnError { .context(ColumnNotFound {
column_id: value.column_index(), column: value.column_index(),
})?; })?;
match (col, value.value_type()) { match (col, value.value_type()) {
(Column::Tag(vals), wb::ColumnValue::TagValue) => { (Column::Tag(vals), wb::ColumnValue::TagValue) => {
let v = value.value_as_tag_value().context(WalColumnError { let v = value.value_as_tag_value().context(WalValueTypeMismatch {
column_id: value.column_index(), column: value.column_index(),
expected: "tag",
})?; })?;
vals.push(Some(v.value())); vals.push(Some(v.value()));
} }
(Column::Bool(vals), wb::ColumnValue::BoolValue) => { (Column::Bool(vals), wb::ColumnValue::BoolValue) => {
let v = value.value_as_bool_value().context(WalColumnError { let v = value.value_as_bool_value().context(WalValueTypeMismatch {
column_id: value.column_index(), column: value.column_index(),
expected: "bool",
})?; })?;
vals.push(Some(v.value())); vals.push(Some(v.value()));
} }
(Column::String(vals), wb::ColumnValue::StringValue) => { (Column::String(vals), wb::ColumnValue::StringValue) => {
let v = value.value_as_string_value().context(WalColumnError { let v = value
column_id: value.column_index(), .value_as_string_value()
})?; .context(WalValueTypeMismatch {
column: value.column_index(),
expected: "String",
})?;
vals.push(Some(v.value().unwrap().to_string())); vals.push(Some(v.value().unwrap().to_string()));
} }
(Column::I64(vals), wb::ColumnValue::I64Value) => { (Column::I64(vals), wb::ColumnValue::I64Value) => {
let v = value.value_as_i64value().context(WalColumnError { let v = value.value_as_i64value().context(WalValueTypeMismatch {
column_id: value.column_index(), column: value.column_index(),
expected: "i64",
})?; })?;
vals.push(Some(v.value())); vals.push(Some(v.value()));
} }
(Column::F64(vals), wb::ColumnValue::F64Value) => { (Column::F64(vals), wb::ColumnValue::F64Value) => {
let v = value.value_as_f64value().context(WalColumnError { let v = value.value_as_f64value().context(WalValueTypeMismatch {
column_id: value.column_index(), column: value.column_index(),
expected: "f64",
})?; })?;
vals.push(Some(v.value())); vals.push(Some(v.value()));
} }
_ => { (existing_column, inserted_value) => {
return SchemaMismatch { return ColumnTypeMismatch {
error: "column type mismatch recovering from WAL", column: value.column_index(),
existing_column_type: existing_column.type_description(),
inserted_value_type: type_description(inserted_value),
} }
.fail() .fail()
} }
@ -906,8 +1003,8 @@ impl Table {
// insert new columns and validate existing ones // insert new columns and validate existing ones
for val in values { for val in values {
let column = match self.column_id_to_index.get(&val.id) { let (column, column_id) = match self.column_id_to_index.get(&val.id) {
Some(idx) => &mut self.columns[*idx], Some(idx) => (&mut self.columns[*idx], *idx),
None => { None => {
// Add the column and make all values for existing rows None // Add the column and make all values for existing rows None
let index = self.columns.len(); let index = self.columns.len();
@ -946,14 +1043,16 @@ impl Table {
builder.add_schema_append(self.partition_id, self.id, val.id, wal_type); builder.add_schema_append(self.partition_id, self.id, val.id, wal_type);
} }
&mut self.columns[index] (&mut self.columns[index], index)
} }
}; };
ensure!( ensure!(
column.matches_type(&val), column.matches_type(&val),
SchemaMismatch { SchemaMismatch {
error: format!("new column type {:?} doesn't match existing type", val) column_id,
existing_column_type: column.type_description(),
inserted_value_type: val.value.type_description(),
} }
); );
} }
@ -974,9 +1073,11 @@ impl Table {
} }
vals.push(Some(*val)) vals.push(Some(*val))
} }
_ => { col => {
return SchemaMismatch { return SchemaMismatch {
error: "passed value is a tag and existing column is not".to_string(), column_id: *idx,
existing_column_type: col.type_description(),
inserted_value_type: "tag",
} }
.fail() .fail()
} }
@ -984,7 +1085,9 @@ impl Table {
Value::FieldValue(field) => match (column, field) { Value::FieldValue(field) => match (column, field) {
(Column::Tag(_), _) => { (Column::Tag(_), _) => {
return SchemaMismatch { return SchemaMismatch {
error: "existing column is a tag and passed value is not".to_string(), column_id: *idx,
existing_column_type: "tag",
inserted_value_type: val.value.type_description(),
} }
.fail() .fail()
} }
@ -1012,7 +1115,14 @@ impl Table {
} }
vals.push(Some(*val)) vals.push(Some(*val))
} }
_ => panic!("yarrr the field didn't match"), (column, _) => {
return SchemaMismatch {
column_id: *idx,
existing_column_type: column.type_description(),
inserted_value_type: val.value.type_description(),
}
.fail()
}
}, },
} }
} }
@ -1320,6 +1430,16 @@ impl Column {
} }
} }
fn type_description(&self) -> &'static str {
match self {
Self::F64(_) => "f64",
Self::I64(_) => "i64",
Self::String(_) => "String",
Self::Bool(_) => "bool",
Self::Tag(_) => "tag",
}
}
// TODO: have type mismatches return helpful error // TODO: have type mismatches return helpful error
fn matches_type(&self, val: &ColumnValue<'_>) -> bool { fn matches_type(&self, val: &ColumnValue<'_>) -> bool {
match (self, &val.value) { match (self, &val.value) {