From 5fe3bfd53c4c426bbcffabd80aa483b93d4a4050 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 18 Sep 2020 07:47:37 -0400 Subject: [PATCH] refactor: extract WalDetails into delorean_wal_writer crate (#297) --- Cargo.lock | 14 +++ Cargo.toml | 2 + delorean_wal_writer/Cargo.toml | 17 +++ delorean_wal_writer/src/lib.rs | 154 ++++++++++++++++++++++++ src/storage/partitioned_store.rs | 167 +++++---------------------- src/storage/write_buffer_database.rs | 11 +- 6 files changed, 216 insertions(+), 149 deletions(-) create mode 100644 delorean_wal_writer/Cargo.toml create mode 100644 delorean_wal_writer/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 15d2e86ef7..62aaa06b41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -648,6 +648,7 @@ dependencies = [ "delorean_test_helpers", "delorean_tsm", "delorean_wal", + "delorean_wal_writer", "dirs 2.0.2", "dotenv", "either", @@ -813,6 +814,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "delorean_wal_writer" +version = "0.1.0" +dependencies = [ + "delorean_wal", + "futures", + "serde", + "serde_json", + "snafu", + "tokio", + "tracing", +] + [[package]] name = "difference" version = "2.0.0" diff --git a/Cargo.toml b/Cargo.toml index 2416220fd2..0ef1d8c46f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ members = [ "delorean_test_helpers", "delorean_tsm", "delorean_wal", + "delorean_wal_writer", "influxdb2_client", ] @@ -31,6 +32,7 @@ delorean_parquet = { path = "delorean_parquet" } delorean_table = { path = "delorean_table" } delorean_table_schema = { path = "delorean_table_schema" } delorean_wal = { path = "delorean_wal" } +delorean_wal_writer = { path = "delorean_wal_writer" } delorean_object_store = { path = "delorean_object_store" } delorean_tsm = { path = "delorean_tsm" } diff --git a/delorean_wal_writer/Cargo.toml b/delorean_wal_writer/Cargo.toml new file mode 100644 index 0000000000..06ca489424 --- /dev/null +++ b/delorean_wal_writer/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "delorean_wal_writer" +version = "0.1.0" +authors = ["Andrew Lamb "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +delorean_wal = { path = "../delorean_wal" } + +tokio = { version = "0.2", features = ["full"] } +serde_json = "1.0.44" +serde = { version = "1.0", features = ["derive"] } +futures = "0.3.1" +tracing = "0.1" +snafu = "0.6.2" diff --git a/delorean_wal_writer/src/lib.rs b/delorean_wal_writer/src/lib.rs new file mode 100644 index 0000000000..321041e2bc --- /dev/null +++ b/delorean_wal_writer/src/lib.rs @@ -0,0 +1,154 @@ +use delorean_wal::{Error as WalError, SequenceNumber, WalBuilder, WritePayload}; + +use futures::{channel::mpsc, SinkExt, StreamExt}; +use snafu::{ResultExt, Snafu}; + +use serde::{Deserialize, Serialize}; +use tokio::task; +use tracing::{error, info}; + +use std::path::PathBuf; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Wal Writer error using WAL: {}", source))] + UnderlyingWalError { source: WalError }, + + #[snafu(display("Error serializing metadata: {}", source))] + SerializeMetadata { source: serde_json::error::Error }, + + #[snafu(display("Error writing to WAL: {}", source))] + WrtitingToWal { source: std::io::Error }, + + #[snafu(display("Error writing metadata to '{:?}': {}", metadata_path, source))] + WritingMetadata { + metadata_path: PathBuf, + source: std::io::Error, + }, +} + +pub type Result = std::result::Result; + +#[derive(Debug)] +pub struct WalDetails { + pub metadata_path: PathBuf, + pub metadata: WalMetadata, + pub write_tx: mpsc::Sender, +} + +#[derive(Debug)] +pub struct WalWrite { + payload: WritePayload, + notify_tx: mpsc::Sender>, +} + +impl WalDetails { + pub async fn write_metadata(&self) -> Result<()> { + Ok(tokio::fs::write( + self.metadata_path.clone(), + serde_json::to_string(&self.metadata).context(SerializeMetadata)?, + ) + .await + .context(WritingMetadata { + metadata_path: &self.metadata_path, + })?) + } + + pub async fn write_and_sync(&self, data: Vec) -> Result<()> { + let payload = WritePayload::new(data).context(UnderlyingWalError {})?; + + let (notify_tx, mut notify_rx) = mpsc::channel(1); + + let write = WalWrite { payload, notify_tx }; + + let mut tx = self.write_tx.clone(); + tx.send(write) + .await + .expect("The WAL thread should always be running to receive a write"); + + let _ = notify_rx + .next() + .await + .expect("The WAL thread should always be running to send a response.") + .context(UnderlyingWalError {})?; + + Ok(()) + } +} + +/// Metadata about this particular WAL +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)] +pub struct WalMetadata { + pub format: WalFormat, +} + +impl Default for WalMetadata { + fn default() -> Self { + Self { + format: WalFormat::FlatBuffers, + } + } +} + +/// Supported WAL formats that can be restored +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)] +pub enum WalFormat { + FlatBuffers, + #[serde(other)] + Unknown, +} + +pub async fn start_wal_sync_task(wal_builder: WalBuilder) -> Result { + let mut wal = wal_builder.wal().context(UnderlyingWalError)?; + + let metadata = tokio::fs::read_to_string(wal.metadata_path()) + .await + .and_then(|raw_metadata| { + serde_json::from_str::(&raw_metadata).map_err(Into::into) + }) + .unwrap_or_default(); + let metadata_path = wal.metadata_path(); + + let (write_tx, mut write_rx) = mpsc::channel::(100); + + tokio::spawn({ + async move { + loop { + match write_rx.next().await { + Some(write) => { + let payload = write.payload; + let mut tx = write.notify_tx; + + let result = task::block_in_place(|| { + let seq = wal.append(payload)?; + wal.sync_all()?; + Ok(seq) + }); + + if let Err(e) = tx.send(result).await { + error!("error sending result back to writer {:?}", e); + } + } + None => { + info!("shutting down WAL for {:?}", wal.metadata_path()); + return; + } + } + } + } + }); + + Ok(WalDetails { + metadata_path, + metadata, + write_tx, + }) +} + +#[cfg(test)] +mod tests { + #[test] + fn it_works_but_has_no_tests() { + // :thinkin_face: + } +} diff --git a/src/storage/partitioned_store.rs b/src/storage/partitioned_store.rs index 15adc2c761..bf821127d4 100644 --- a/src/storage/partitioned_store.rs +++ b/src/storage/partitioned_store.rs @@ -9,13 +9,10 @@ use crate::storage::{ ReadPoint, SeriesDataType, }; -use delorean_wal::{Error as WalError, SequenceNumber, WalBuilder, WritePayload}; -use futures::{ - channel::mpsc, - stream::{BoxStream, Stream}, - SinkExt, StreamExt, -}; -use serde::{Deserialize, Serialize}; +use delorean_wal::{Error as WalError, WalBuilder}; +use delorean_wal_writer::{start_wal_sync_task, Error as WalWriterError, WalDetails, WalFormat}; + +use futures::stream::{BoxStream, Stream}; use snafu::{ResultExt, Snafu}; use std::{ cmp::Ordering, @@ -25,8 +22,7 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use tokio::task; -use tracing::{debug, error, info}; +use tracing::debug; #[derive(Debug, Snafu)] pub enum Error { @@ -36,20 +32,11 @@ pub enum Error { #[snafu(display("Partition error with WAL: {}", source))] UnderlyingWalError { source: WalError }, - #[snafu(display("Partition error writing to WAL: {}", source))] - WrtitingToWal { source: std::io::Error }, + #[snafu(display("Partition error with WAL Writer: {}", source))] + UnderlyingWalWriterError { source: WalWriterError }, #[snafu(display("Partition error with MemDB: {}", source))] UnderlyingMemDBError { source: MemDBError }, - - #[snafu(display("Error serializing metadata: {}", source))] - SerializeMetadata { source: serde_json::error::Error }, - - #[snafu(display("Error writing metadata to '{:?}': {}", metadata_path, source))] - WritingMetadata { - metadata_path: PathBuf, - source: std::io::Error, - }, } pub type Result = std::result::Result; @@ -78,53 +65,6 @@ pub struct Partition { wal_details: Option, } -#[derive(Debug)] -pub struct WalDetails { - metadata_path: PathBuf, - metadata: WalMetadata, - write_tx: mpsc::Sender, -} - -#[derive(Debug)] -struct WalWrite { - payload: WritePayload, - notify_tx: mpsc::Sender>, -} - -impl WalDetails { - pub async fn write_metadata(&self) -> Result<()> { - Ok(tokio::fs::write( - self.metadata_path.clone(), - serde_json::to_string(&self.metadata).context(SerializeMetadata)?, - ) - .await - .context(WritingMetadata { - metadata_path: &self.metadata_path, - })?) - } - - pub async fn write_and_sync(&self, data: Vec) -> Result<()> { - let payload = WritePayload::new(data).context(UnderlyingWalError {})?; - - let (notify_tx, mut notify_rx) = mpsc::channel(1); - - let write = WalWrite { payload, notify_tx }; - - let mut tx = self.write_tx.clone(); - tx.send(write) - .await - .expect("The WAL thread should always be running to receive a write"); - - let _ = notify_rx - .next() - .await - .expect("The WAL thread should always be running to send a response.") - .context(UnderlyingWalError {})?; - - Ok(()) - } -} - impl Partition { pub fn new_without_wal(store: PartitionStore) -> Self { Self { @@ -135,8 +75,14 @@ impl Partition { pub async fn new_with_wal(store: PartitionStore, wal_dir: PathBuf) -> Result { let wal_builder = WalBuilder::new(wal_dir); - let wal_details = start_wal_sync_task(wal_builder).await?; - wal_details.write_metadata().await?; + let wal_details = start_wal_sync_task(wal_builder) + .await + .context(UnderlyingWalWriterError)?; + + wal_details + .write_metadata() + .await + .context(UnderlyingWalWriterError)?; Ok(Self { store, @@ -148,7 +94,10 @@ impl Partition { let partition_id = bucket_name.to_string(); let mut db = MemDB::new(partition_id); let wal_builder = WalBuilder::new(bucket_dir); - let wal_details = start_wal_sync_task(wal_builder.clone()).await?; + let wal_details = start_wal_sync_task(wal_builder.clone()) + .await + .context(UnderlyingWalWriterError)?; + debug!("Wal details {:?}", wal_details); match wal_details.metadata.format { @@ -185,7 +134,10 @@ impl Partition { } let store = PartitionStore::MemDB(Box::new(db)); - wal_details.write_metadata().await?; + wal_details + .write_metadata() + .await + .context(UnderlyingWalWriterError)?; Ok(Self { store, @@ -201,7 +153,9 @@ impl Partition { let flatbuffer = points_to_flatbuffer(&points); let (mut data, idx) = flatbuffer.collapse(); let data = data.split_off(idx); - wal.write_and_sync(data).await?; + wal.write_and_sync(data) + .await + .context(UnderlyingWalWriterError)?; } match &mut self.store { @@ -331,75 +285,6 @@ impl Partition { } } -pub async fn start_wal_sync_task(wal_builder: WalBuilder) -> Result { - let mut wal = wal_builder.wal().context(UnderlyingWalError)?; - - let metadata = tokio::fs::read_to_string(wal.metadata_path()) - .await - .and_then(|raw_metadata| { - serde_json::from_str::(&raw_metadata).map_err(Into::into) - }) - .unwrap_or_default(); - let metadata_path = wal.metadata_path(); - - let (write_tx, mut write_rx) = mpsc::channel::(100); - - tokio::spawn({ - async move { - loop { - match write_rx.next().await { - Some(write) => { - let payload = write.payload; - let mut tx = write.notify_tx; - - let result = task::block_in_place(|| { - let seq = wal.append(payload)?; - wal.sync_all()?; - Ok(seq) - }); - - if let Err(e) = tx.send(result).await { - error!("error sending result back to writer {:?}", e); - } - } - None => { - info!("shutting down WAL for {:?}", wal.metadata_path()); - return; - } - } - } - } - }); - - Ok(WalDetails { - metadata_path, - metadata, - write_tx, - }) -} - -/// Metadata about this particular WAL -#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)] -pub struct WalMetadata { - format: WalFormat, -} - -impl Default for WalMetadata { - fn default() -> Self { - Self { - format: WalFormat::FlatBuffers, - } - } -} - -/// Supported WAL formats that can be restored -#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)] -enum WalFormat { - FlatBuffers, - #[serde(other)] - Unknown, -} - fn points_to_flatbuffer(points: &[PointType]) -> flatbuffers::FlatBufferBuilder<'_> { let mut builder = flatbuffers::FlatBufferBuilder::new_with_capacity(1024); diff --git a/src/storage/write_buffer_database.rs b/src/storage/write_buffer_database.rs index 568df2d5cb..e44965f681 100644 --- a/src/storage/write_buffer_database.rs +++ b/src/storage/write_buffer_database.rs @@ -2,12 +2,10 @@ use tonic::async_trait; use tracing::info; use crate::generated_types::wal as wb; -use crate::storage::partitioned_store::{ - start_wal_sync_task, Error as PartitionedStoreError, WalDetails, -}; use delorean_line_parser::{FieldValue, ParsedLine}; use delorean_wal::WalBuilder; +use delorean_wal_writer::{start_wal_sync_task, Error as WalWriterError, WalDetails}; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::convert::TryFrom; @@ -54,22 +52,19 @@ pub enum Error { #[snafu(display("Error reading metadata: {}", source))] ReadMetadataError { source: std::io::Error }, - #[snafu(display("Partition error writing to WAL: {}", source))] - WritingToWal { source: std::io::Error }, - #[snafu(display("Dir {:?} invalid for DB", dir))] OpenDb { dir: PathBuf }, #[snafu(display("Error opening WAL for database {}: {}", database, source))] OpeningWal { database: String, - source: PartitionedStoreError, + source: WalWriterError, }, #[snafu(display("Error writing to WAL for database {}: {}", database, source))] WritingWal { database: String, - source: PartitionedStoreError, + source: WalWriterError, }, #[snafu(display("Error opening WAL for database {}: {}", database, source))]