Merge remote-tracking branch 'origin/main' into dom/wal-write

pull/24376/head
Carol (Nichols || Goulding) 2022-11-30 13:27:28 -05:00
commit b6b8e6ac10
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
8 changed files with 172 additions and 104 deletions

1
Cargo.lock generated
View File

@ -6026,7 +6026,6 @@ dependencies = [
"test_helpers",
"tokio",
"tokio-util",
"uuid",
"workspace-hack",
]

View File

@ -451,17 +451,9 @@ impl IOxSessionContext {
let it = ctx.execute_stream(physical_plan).await?;
let series_sets = SeriesSetConverter::default()
SeriesSetConverter::default()
.convert(table_name, tag_columns, field_columns, it)
.await
.map_err(|e| {
Error::Execution(format!(
"Error executing series set conversion: {}",
e
))
})?;
Ok(futures::stream::iter(series_sets).map(|x| Ok(x) as Result<_>))
})
})
.try_flatten()

View File

@ -15,9 +15,8 @@ use datafusion::{
};
use futures::{Stream, StreamExt, TryStreamExt};
use snafu::{OptionExt, ResultExt, Snafu};
use snafu::{OptionExt, Snafu};
use std::sync::Arc;
use tokio::sync::mpsc::error::SendError;
use crate::exec::{
field::{self, FieldColumns, FieldIndexes},
@ -31,40 +30,11 @@ use super::{
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Plan Execution Error: {}", source))]
Execution {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[snafu(display(
"Error reading record batch while converting from SeriesSet: {:?}",
source
))]
Reading {
source: datafusion::error::DataFusionError,
},
#[snafu(display(
"Error concatenating record batch while converting from SeriesSet: {:?}",
source
))]
Concatenating { source: arrow::error::ArrowError },
#[snafu(display("Internal field error while converting series set: {}", source))]
InternalField { source: field::Error },
#[snafu(display("Internal error finding grouping colum: {}", column_name))]
FindingGroupColumn { column_name: String },
#[snafu(display("Sending series set results during conversion: {:?}", source))]
SendingDuringConversion {
source: Box<SendError<Result<SeriesSet>>>,
},
#[snafu(display("Sending grouped series set results during conversion: {:?}", source))]
SendingDuringGroupedConversion {
source: Box<SendError<Result<SeriesSet>>>,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -93,26 +63,54 @@ impl SeriesSetConverter {
tag_columns: Arc<Vec<Arc<str>>>,
field_columns: FieldColumns,
it: SendableRecordBatchStream,
) -> Result<Vec<SeriesSet>, Error> {
) -> Result<impl Stream<Item = Result<SeriesSet, DataFusionError>>, DataFusionError> {
assert_eq!(
tag_columns.as_ref(),
&{
let mut tmp = tag_columns.as_ref().clone();
tmp.sort();
tmp
},
"Tag column sorted",
);
let schema = it.schema();
// for now, this logic only handles a single `RecordBatch` so
// concat data together.
//
// proper streaming support tracked by:
// https://github.com/influxdata/influxdb_iox/issues/4445
let batches = collect(it).await.context(ReadingSnafu)?;
let batches = collect(it).await.map_err(|e| {
DataFusionError::Context(
"Error reading record batch while converting from SeriesSet".to_string(),
Box::new(e),
)
})?;
let batch = if !batches.is_empty() {
compute::concat_batches(&batches[0].schema(), &batches).context(ConcatenatingSnafu)?
} else {
return Ok(vec![]);
};
let batch = compute::concat_batches(&schema, &batches).map_err(|e| {
DataFusionError::Context(
"Error concatenating record batch while converting from SeriesSet".to_string(),
Box::new(DataFusionError::ArrowError(e)),
)
})?;
if batch.num_rows() == 0 {
return Ok(futures::stream::empty().boxed());
}
let schema = batch.schema();
// TODO: check that the tag columns are sorted by tag name...
let tag_indexes =
FieldIndexes::names_to_indexes(&schema, &tag_columns).context(InternalFieldSnafu)?;
let field_indexes = FieldIndexes::from_field_columns(&schema, &field_columns)
.context(InternalFieldSnafu)?;
let tag_indexes = FieldIndexes::names_to_indexes(&schema, &tag_columns).map_err(|e| {
DataFusionError::Context(
"Internal field error while converting series set".to_string(),
Box::new(DataFusionError::External(Box::new(e))),
)
})?;
let field_indexes =
FieldIndexes::from_field_columns(&schema, &field_columns).map_err(|e| {
DataFusionError::Context(
"Internal field error while converting series set".to_string(),
Box::new(DataFusionError::External(Box::new(e))),
)
})?;
// Algorithm: compute, via bitsets, the rows at which each
// tag column changes and thereby where the tagset
@ -148,7 +146,7 @@ impl SeriesSetConverter {
// call await during the loop)
// emit each series
let series_sets = intersections
let series_sets: Vec<_> = intersections
.into_iter()
.map(|end_row| {
let series_set = SeriesSet {
@ -163,7 +161,7 @@ impl SeriesSetConverter {
})
.collect();
Ok(series_sets)
Ok(futures::stream::iter(series_sets).map(Ok).boxed())
}
/// returns a bitset with all row indexes where the value of the
@ -777,6 +775,9 @@ mod tests {
.convert(table_name, tag_columns, field_columns, it)
.await
.expect("Conversion happened without error")
.try_collect()
.await
.expect("Conversion happened without error")
}
/// Test helper: parses the csv content into a single record batch arrow

View File

@ -22,7 +22,6 @@ snafu = "0.7"
snap = "1.0.0"
tokio = { version = "1.21", features = ["macros", "fs", "io-util", "parking_lot", "rt-multi-thread", "sync", "time"] }
tokio-util = "0.7"
uuid = "1.2"
workspace-hack = { path = "../workspace-hack" }
[dev-dependencies] # In alphabetical order

View File

@ -1,4 +1,4 @@
use crate::{FileTypeIdentifier, SegmentEntry, SequencedWalOp};
use crate::{FileTypeIdentifier, SegmentEntry, SegmentIdBytes, SequencedWalOp};
use byteorder::{BigEndian, ReadBytesExt};
use crc32fast::Hasher;
use generated_types::influxdata::iox::wal::v1::SequencedWalOp as ProtoSequencedWalOp;
@ -38,7 +38,7 @@ where
Ok(data)
}
pub fn read_header(&mut self) -> Result<(FileTypeIdentifier, uuid::Bytes)> {
pub fn read_header(&mut self) -> Result<(FileTypeIdentifier, SegmentIdBytes)> {
Ok((self.read_array()?, self.read_array()?))
}
@ -333,7 +333,7 @@ mod tests {
impl FakeSegmentFile {
fn new() -> Self {
Self {
id: SegmentId::new(),
id: SegmentId::new(0),
entries: Default::default(),
}
}
@ -348,7 +348,7 @@ mod tests {
f.write_all(FILE_TYPE_IDENTIFIER).unwrap();
let id_bytes = self.id.as_bytes();
f.write_all(id_bytes).unwrap();
f.write_all(&id_bytes).unwrap();
for entry in &self.entries {
f.write_u32::<BigEndian>(entry.checksum()).unwrap();

View File

@ -7,6 +7,10 @@ use std::{
io::{self, Write},
mem, num,
path::PathBuf,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
pub struct OpenSegmentFileWriter {
@ -17,8 +21,11 @@ pub struct OpenSegmentFileWriter {
}
impl OpenSegmentFileWriter {
pub fn new_in_directory(dir: impl Into<PathBuf>) -> Result<Self> {
let id = SegmentId::new();
pub fn new_in_directory(
dir: impl Into<PathBuf>,
next_id_source: Arc<AtomicU64>,
) -> Result<Self> {
let id = SegmentId::new(next_id_source.fetch_add(1, Ordering::Relaxed));
let path = crate::build_segment_path(dir, id);
let mut f = OpenOptions::new()
@ -32,7 +39,7 @@ impl OpenSegmentFileWriter {
let file_type_bytes_written = FILE_TYPE_IDENTIFIER.len();
let id_bytes = id.as_bytes();
f.write_all(id_bytes).context(SegmentWriteIdSnafu)?;
f.write_all(&id_bytes).context(SegmentWriteIdSnafu)?;
let id_bytes_written = id_bytes.len();
f.sync_all().expect("fsync failure");

View File

@ -21,9 +21,13 @@ use generated_types::{
};
use prost::Message;
use snafu::prelude::*;
use std::{collections::HashMap, io, path::PathBuf};
use std::{
collections::BTreeMap,
io,
path::PathBuf,
sync::{atomic::AtomicU64, Arc},
};
use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid;
mod blocking;
@ -91,9 +95,9 @@ pub enum Error {
source: blocking::ReaderError,
},
InvalidUuid {
InvalidId {
filename: String,
source: uuid::Error,
source: std::num::ParseIntError,
},
SegmentNotFound {
@ -114,28 +118,29 @@ pub enum Error {
/// A specialized `Result` for WAL-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Segments are identified by a type 4 UUID
/// Segments are identified by a u64 that indicates file ordering
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct SegmentId(Uuid);
pub struct SegmentId(u64);
pub type SegmentIdBytes = [u8; 8];
#[allow(missing_docs)]
impl SegmentId {
pub fn new() -> Self {
Self(Uuid::new_v4())
pub fn new(v: u64) -> Self {
Self(v)
}
pub fn get(&self) -> Uuid {
pub fn get(&self) -> u64 {
self.0
}
pub fn as_bytes(&self) -> &[u8] {
self.0.as_bytes()
pub fn as_bytes(&self) -> SegmentIdBytes {
self.0.to_be_bytes()
}
}
impl From<Uuid> for SegmentId {
fn from(uuid: Uuid) -> Self {
Self(uuid)
pub fn from_bytes(bytes: SegmentIdBytes) -> Self {
let v = u64::from_be_bytes(bytes);
Self::new(v)
}
}
@ -145,13 +150,6 @@ impl std::fmt::Display for SegmentId {
}
}
impl Default for SegmentId {
fn default() -> Self {
Self::new()
}
}
// TODO: find a better name
pub(crate) fn build_segment_path(dir: impl Into<PathBuf>, id: SegmentId) -> PathBuf {
let mut path = dir.into();
path.push(id.to_string());
@ -179,8 +177,9 @@ const SEGMENT_FILE_EXTENSION: &str = "dat";
#[derive(Debug)]
pub struct Wal {
root: PathBuf,
closed_segments: RwLock<HashMap<SegmentId, ClosedSegment>>,
closed_segments: RwLock<BTreeMap<SegmentId, ClosedSegment>>,
open_segment: OpenSegmentFile,
next_id_source: Arc<AtomicU64>,
}
impl Wal {
@ -203,7 +202,9 @@ impl Wal {
.await
.context(UnableToReadDirectoryContentsSnafu { path: &root })?;
let mut closed_segments = HashMap::new();
// Closed segments must be ordered by ID, which is the order they were written in and the
// order they should be replayed in.
let mut closed_segments = BTreeMap::new();
while let Some(child) = dir
.next_entry()
@ -222,9 +223,7 @@ impl Wal {
let filename = filename
.to_str()
.expect("WAL files created by IOx should be named with valid UTF-8");
let id = Uuid::parse_str(filename)
.context(InvalidUuidSnafu { filename })?
.into();
let id = SegmentId::new(filename.parse().context(InvalidIdSnafu { filename })?);
let segment = ClosedSegment {
id,
path: child.path(),
@ -234,12 +233,21 @@ impl Wal {
}
}
let open_segment = OpenSegmentFile::new_in_directory(&root).await?;
let next_id = closed_segments
.keys()
.last()
.copied()
.map(|id| id.get() + 1)
.unwrap_or(0);
let next_id_source = Arc::new(AtomicU64::new(next_id));
let open_segment =
OpenSegmentFile::new_in_directory(&root, Arc::clone(&next_id_source)).await?;
Ok(Self {
root,
closed_segments: RwLock::new(closed_segments),
open_segment,
next_id_source,
})
}
@ -314,11 +322,16 @@ impl<'a> WalRotator<'a> {
(),
)
.await?;
self.0
let previous_value = self
.0
.closed_segments
.write()
.await
.insert(closed.id, closed.clone());
assert!(
previous_value.is_none(),
"Should always add new closed segment entries, not replace"
);
Ok(closed)
}
@ -407,11 +420,16 @@ struct OpenSegmentFile {
}
impl OpenSegmentFile {
async fn new_in_directory(dir: impl Into<PathBuf>) -> Result<Self> {
async fn new_in_directory(
dir: impl Into<PathBuf>,
next_id_source: Arc<AtomicU64>,
) -> Result<Self> {
let dir = dir.into();
let dir_for_closure = dir.clone();
let (tx, rx) = mpsc::channel(10);
let task = tokio::task::spawn_blocking(move || Self::task_main(rx, dir_for_closure));
let task = tokio::task::spawn_blocking(move || {
Self::task_main(rx, dir_for_closure, next_id_source)
});
std::fs::File::open(&dir)
.context(OpenSegmentDirectorySnafu { path: dir })?
.sync_all()
@ -422,8 +440,16 @@ impl OpenSegmentFile {
fn task_main(
mut rx: tokio::sync::mpsc::Receiver<OpenSegmentFileWriterRequest>,
dir: PathBuf,
next_id_source: Arc<AtomicU64>,
) -> Result<()> {
let new_writ = || Ok(blocking::OpenSegmentFileWriter::new_in_directory(&dir).unwrap());
let new_writ =
|| {
Ok(blocking::OpenSegmentFileWriter::new_in_directory(
&dir,
Arc::clone(&next_id_source),
)
.unwrap())
};
let mut open_write = new_writ()?;
while let Some(req) = rx.blocking_recv() {
@ -471,7 +497,7 @@ impl OpenSegmentFile {
#[derive(Debug)]
enum ClosedSegmentFileReaderRequest {
ReadHeader(oneshot::Sender<blocking::ReaderResult<(FileTypeIdentifier, uuid::Bytes)>>),
ReadHeader(oneshot::Sender<blocking::ReaderResult<(FileTypeIdentifier, SegmentIdBytes)>>),
NextOps(oneshot::Sender<blocking::ReaderResult<Option<SequencedWalOp>>>),
}
@ -500,8 +526,7 @@ impl ClosedSegmentFileReader {
SegmentFileIdentifierMismatchSnafu,
);
let id = Uuid::from_bytes(id);
let id = SegmentId::from(id);
let id = SegmentId::from_bytes(id);
Ok(Self { id, tx, task })
}
@ -589,7 +614,10 @@ mod tests {
#[tokio::test]
async fn segment_file_write_and_read_ops() {
let dir = test_helpers::tmp_dir().unwrap();
let segment = OpenSegmentFile::new_in_directory(dir.path()).await.unwrap();
let next_id_source = Arc::new(AtomicU64::new(0));
let segment = OpenSegmentFile::new_in_directory(dir.path(), next_id_source)
.await
.unwrap();
let writer = segment.write_handle();
let w1 = test_data("m1,t=foo v=1i 1");
@ -659,7 +687,7 @@ mod tests {
// No writes, but rotating is totally fine
let wal_rotator = wal.rotation_handle().await;
let closed_segment_details = wal_rotator.rotate().await.unwrap();
assert_eq!(closed_segment_details.size(), 24);
assert_eq!(closed_segment_details.size(), 16);
// There's one closed segment
let closed = wal_reader.closed_segments().await;

View File

@ -27,13 +27,13 @@ async fn crud() {
// Can write an entry to the open segment
let op = arbitrary_sequenced_wal_op(42);
let summary = open.write_op(op).await.unwrap();
assert_eq!(summary.total_bytes, 130);
assert_eq!(summary.total_bytes, 122);
assert_eq!(summary.bytes_written, 106);
// Can write another entry; total_bytes accumulates
let op = arbitrary_sequenced_wal_op(43);
let summary = open.write_op(op).await.unwrap();
assert_eq!(summary.total_bytes, 236);
assert_eq!(summary.total_bytes, 228);
assert_eq!(summary.bytes_written, 106);
// Still no closed segments
@ -47,7 +47,7 @@ async fn crud() {
// Can't read entries from the open segment; have to rotate first
let wal_rotator = wal.rotation_handle().await;
let closed_segment_details = wal_rotator.rotate().await.unwrap();
assert_eq!(closed_segment_details.size(), 236);
assert_eq!(closed_segment_details.size(), 228);
// There's one closed segment
let closed = wal_reader.closed_segments().await;
@ -121,6 +121,48 @@ async fn replay() {
assert_eq!(op.sequence_number, 43);
}
#[tokio::test]
async fn ordering() {
let dir = test_helpers::tmp_dir().unwrap();
// Create a WAL with two closed segments and an open segment with entries, then drop the WAL
{
let wal = wal::Wal::new(dir.path()).await.unwrap();
let open = wal.write_handle().await;
let wal_rotator = wal.rotation_handle().await;
let op = arbitrary_sequenced_wal_op(42);
open.write_op(op).await.unwrap();
wal_rotator.rotate().await.unwrap();
let op = arbitrary_sequenced_wal_op(43);
open.write_op(op).await.unwrap();
wal_rotator.rotate().await.unwrap();
let op = arbitrary_sequenced_wal_op(44);
open.write_op(op).await.unwrap();
}
// Create a new WAL instance with the same directory to replay from the files
let wal = wal::Wal::new(dir.path()).await.unwrap();
let wal_reader = wal.read_handle();
let wal_rotator = wal.rotation_handle().await;
// There are 3 segments (from the 2 closed and 1 open) and they're in the order they were
// created
let closed = wal_reader.closed_segments().await;
let closed_segment_ids: Vec<_> = closed.iter().map(|c| c.id().get()).collect();
assert_eq!(closed_segment_ids, &[0, 1, 2]);
// The open segment is next in order
let closed_segment_details = wal_rotator.rotate().await.unwrap();
assert_eq!(closed_segment_details.id().get(), 3);
// Creating new files after replay are later in the ordering
let closed_segment_details = wal_rotator.rotate().await.unwrap();
assert_eq!(closed_segment_details.id().get(), 4);
}
fn arbitrary_sequenced_wal_op(sequence_number: u64) -> SequencedWalOp {
let w = test_data("m1,t=foo v=1i 1");
SequencedWalOp {