feat: add segmenting and wal persistence to WriteBuffer (#24624)

* refactor: move write buffer into its own dir

* feat: implement write buffer segment with wal flushing

This creates the WriteBufferFlusher and OpenBufferSegment. If a wal is passed into the buffer, data written into it will be persisted to the wal for the initialized segment id.

* refactor: use crossbeam in flusher and pr cleanup
pull/24666/head
Paul Dix 2024-02-12 12:36:10 -05:00 committed by GitHub
parent b555ddf18b
commit 4d9095e58d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 868 additions and 215 deletions

3
Cargo.lock generated
View File

@ -2683,16 +2683,17 @@ name = "influxdb3_write"
version = "0.1.0"
dependencies = [
"arrow",
"arrow_util",
"async-trait",
"byteorder",
"bytes",
"chrono",
"crc32fast",
"crossbeam-channel",
"data_types",
"datafusion",
"futures-util",
"influxdb-line-protocol",
"iox_catalog",
"iox_query",
"object_store",
"observability_deps",

View File

@ -11,6 +11,7 @@ use influxdb3_server::{query_executor::QueryExecutorImpl, serve, CommonServerSta
use influxdb3_write::persister::PersisterImpl;
use influxdb3_write::wal::WalImpl;
use influxdb3_write::write_buffer::WriteBufferImpl;
use influxdb3_write::SegmentId;
use iox_query::exec::{Executor, ExecutorConfig};
use ioxd_common::reexport::trace_http::ctx::TraceHeaderParser;
use object_store::DynObjectStore;
@ -48,6 +49,9 @@ pub enum Error {
#[error("Wal error: {0}")]
Wal(#[from] influxdb3_write::wal::Error),
#[error("Write buffer error: {0}")]
WriteBuffer(#[from] influxdb3_write::write_buffer::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -237,7 +241,12 @@ pub async fn command(config: Config) -> Result<()> {
.wal_directory
.map(|dir| WalImpl::new(dir).map(Arc::new))
.transpose()?;
let write_buffer = Arc::new(WriteBufferImpl::new(Arc::clone(&catalog), wal));
// TODO: the next segment ID should be loaded from the persister
let write_buffer = Arc::new(WriteBufferImpl::new(
Arc::clone(&catalog),
wal,
SegmentId::new(0),
)?);
let query_executor = QueryExecutorImpl::new(
catalog,
Arc::clone(&write_buffer),

View File

@ -165,6 +165,7 @@ mod tests {
use datafusion::parquet::data_type::AsBytes;
use hyper::{body, Body, Client, Request, Response};
use influxdb3_write::persister::PersisterImpl;
use influxdb3_write::SegmentId;
use iox_query::exec::{Executor, ExecutorConfig};
use object_store::DynObjectStore;
use parquet_file::storage::{ParquetStorage, StorageId};
@ -200,10 +201,14 @@ mod tests {
mem_pool_size: usize::MAX,
}));
let write_buffer = Arc::new(influxdb3_write::write_buffer::WriteBufferImpl::new(
Arc::clone(&catalog),
None::<Arc<influxdb3_write::wal::WalImpl>>,
));
let write_buffer = Arc::new(
influxdb3_write::write_buffer::WriteBufferImpl::new(
Arc::clone(&catalog),
None::<Arc<influxdb3_write::wal::WalImpl>>,
SegmentId::new(0),
)
.unwrap(),
);
let query_executor = crate::query_executor::QueryExecutorImpl::new(
catalog,
Arc::clone(&write_buffer),

View File

@ -8,7 +8,6 @@ license.workspace = true
[dependencies]
data_types = { path = "../data_types" }
influxdb-line-protocol = { path = "../influxdb_line_protocol" }
iox_catalog = { path = "../iox_catalog" }
iox_query = { path = "../iox_query" }
object_store.workspace = true
observability_deps = { path = "../observability_deps" }
@ -21,6 +20,7 @@ async-trait = "0.1"
byteorder = "1.3.4"
chrono = "0.4"
crc32fast = "1.2.0"
crossbeam-channel = "0.5.11"
datafusion = { workspace = true }
parking_lot = "0.11.1"
parquet = { workspace = true }
@ -33,5 +33,6 @@ bytes = "1.5.0"
futures-util = "0.3.30"
[dev-dependencies]
arrow_util = { path = "../arrow_util" }
test_helpers = { path = "../test_helpers" }

View File

@ -19,6 +19,8 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
pub const TIME_COLUMN_NAME: &str = "time";
#[derive(Debug)]
pub struct Catalog {
inner: RwLock<InnerCatalog>,

View File

@ -201,10 +201,10 @@ pub trait Persister: Debug + Send + Sync + 'static {
pub trait Wal: Debug + Send + Sync + 'static {
/// Opens a writer to a segment, either creating a new file or appending to an existing file.
fn open_segment_writer(&self, segment_id: SegmentId) -> wal::Result<impl WalSegmentWriter>;
fn open_segment_writer(&self, segment_id: SegmentId) -> wal::Result<Box<dyn WalSegmentWriter>>;
/// Opens a reader to a segment file.
fn open_segment_reader(&self, segment_id: SegmentId) -> wal::Result<impl WalSegmentReader>;
fn open_segment_reader(&self, segment_id: SegmentId) -> wal::Result<Box<dyn WalSegmentReader>>;
/// Checks the WAL directory for any segment files and returns them.
fn segment_files(&self) -> wal::Result<Vec<SegmentFile>>;
@ -225,6 +225,8 @@ pub trait WalSegmentWriter: Debug + Send + Sync + 'static {
fn id(&self) -> SegmentId;
fn write_batch(&mut self, ops: Vec<WalOp>) -> wal::Result<SequenceNumber>;
fn last_sequence_number(&self) -> SequenceNumber;
}
pub trait WalSegmentReader: Debug + Send + Sync + 'static {
@ -234,7 +236,7 @@ pub trait WalSegmentReader: Debug + Send + Sync + 'static {
}
/// Individual WalOps get batched into the WAL asynchronously. The batch is then written to the segment file.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct WalOpBatch {
pub sequence_number: SequenceNumber,
pub ops: Vec<WalOp>,
@ -255,7 +257,7 @@ pub enum WalOp {
pub struct LpWriteOp {
pub db_name: String,
pub lp: String,
pub default_time: u64,
pub default_time: i64,
}
/// A single write request can have many lines in it. A writer can request to accept all lines that are valid, while
@ -278,6 +280,7 @@ pub struct BufferedWriteRequest {
pub tag_count: usize,
pub total_buffer_memory_used: usize,
pub segment_id: SegmentId,
pub sequence_number: SequenceNumber,
}
/// A persisted Catalog that contains the database, table, and column schemas.

View File

@ -104,14 +104,14 @@ impl WalImpl {
Ok(Self { root })
}
fn open_segment_writer(&self, segment_id: SegmentId) -> Result<impl WalSegmentWriter> {
fn open_segment_writer(&self, segment_id: SegmentId) -> Result<Box<dyn WalSegmentWriter>> {
let writer = WalSegmentWriterImpl::new_or_open(self.root.clone(), segment_id)?;
Ok(writer)
Ok(Box::new(writer))
}
fn open_segment_reader(&self, segment_id: SegmentId) -> Result<impl WalSegmentReader> {
fn open_segment_reader(&self, segment_id: SegmentId) -> Result<Box<dyn WalSegmentReader>> {
let reader = WalSegmentReaderImpl::new(self.root.clone(), segment_id)?;
Ok(reader)
Ok(Box::new(reader))
}
fn segment_files(&self) -> Result<Vec<SegmentFile>> {
@ -166,11 +166,11 @@ impl WalImpl {
}
impl Wal for WalImpl {
fn open_segment_writer(&self, segment_id: SegmentId) -> Result<impl WalSegmentWriter> {
fn open_segment_writer(&self, segment_id: SegmentId) -> Result<Box<dyn WalSegmentWriter>> {
self.open_segment_writer(segment_id)
}
fn open_segment_reader(&self, segment_id: SegmentId) -> Result<impl WalSegmentReader> {
fn open_segment_reader(&self, segment_id: SegmentId) -> Result<Box<dyn WalSegmentReader>> {
self.open_segment_reader(segment_id)
}
@ -247,18 +247,28 @@ impl WalSegmentWriterImpl {
}
fn write_batch(&mut self, ops: Vec<WalOp>) -> Result<SequenceNumber> {
println!("write batch in impl");
// Ensure the write buffer is always empty before using it.
self.buffer.clear();
self.sequence_number = self.sequence_number.next();
let sequence_number = self.sequence_number.next();
let batch = WalOpBatch {
sequence_number: self.sequence_number,
sequence_number,
ops,
};
let data = serde_json::to_vec(&batch)?;
let bytes_written = self.write_bytes(data)?;
self.bytes_written += bytes_written;
self.sequence_number = sequence_number;
Ok(self.sequence_number)
}
fn write_bytes(&mut self, data: Vec<u8>) -> Result<usize> {
// Only designed to support chunks up to `u32::max` bytes long.
let uncompressed_len = data.len();
u32::try_from(uncompressed_len)?;
@ -270,7 +280,7 @@ impl WalSegmentWriterImpl {
.expect("cannot fail to write to buffer");
// Compress the payload into the reused buffer, recording the crc hash
// as it is wrote.
// as it is written.
let mut encoder = snap::write::FrameEncoder::new(HasherWrapper::new(&mut self.buffer));
encoder.write_all(&data)?;
let (checksum, buf) = encoder
@ -298,9 +308,7 @@ impl WalSegmentWriterImpl {
// fsync the fd
self.f.sync_all().expect("fsync failure");
self.bytes_written += bytes_written;
Ok(self.sequence_number)
Ok(bytes_written)
}
}
@ -313,6 +321,41 @@ impl WalSegmentWriter for WalSegmentWriterImpl {
fn write_batch(&mut self, ops: Vec<WalOp>) -> Result<SequenceNumber> {
self.write_batch(ops)
}
fn last_sequence_number(&self) -> SequenceNumber {
self.sequence_number
}
}
#[derive(Debug)]
pub struct WalSegmentWriterNoopImpl {
segment_id: SegmentId,
sequence_number: SequenceNumber,
}
impl WalSegmentWriterNoopImpl {
pub fn new(segment_id: SegmentId) -> Self {
Self {
segment_id,
sequence_number: SequenceNumber::new(0),
}
}
}
impl WalSegmentWriter for WalSegmentWriterNoopImpl {
fn id(&self) -> SegmentId {
self.segment_id
}
fn write_batch(&mut self, _ops: Vec<WalOp>) -> Result<SequenceNumber> {
let sequence_number = self.sequence_number.next();
self.sequence_number = sequence_number;
Ok(sequence_number)
}
fn last_sequence_number(&self) -> SequenceNumber {
self.sequence_number
}
}
#[derive(Debug)]

View File

@ -0,0 +1,345 @@
//! A single buffer segment used by the write buffer. This is all the data in memory for a
//! single WAL segment. Only one segment should be open for writes in the write buffer at any
//! given time.
use crate::write_buffer::flusher::BufferedWriteResult;
use crate::write_buffer::{FieldData, Row, TableBatch};
use crate::{wal, write_buffer::Result, SegmentId, SequenceNumber, WalOp, WalSegmentWriter};
use arrow::array::{
ArrayRef, BooleanBuilder, Float64Builder, Int64Builder, StringBuilder, StringDictionaryBuilder,
TimestampNanosecondBuilder, UInt64Builder,
};
use arrow::datatypes::Int32Type;
use arrow::record_batch::RecordBatch;
use data_types::{ColumnType, NamespaceName, TimestampMinMax};
use schema::Schema;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::oneshot;
#[derive(Debug)]
pub struct OpenBufferSegment {
segment_writer: Box<dyn WalSegmentWriter>,
segment_id: SegmentId,
buffered_data: HashMap<String, DatabaseBuffer>,
// TODO: This is temporarily just the number of rows in the segment. When the buffer gets refactored to use
// different structures, we want this to be a representation of approximate memory usage.
segment_size: usize,
}
impl OpenBufferSegment {
pub fn new(segment_id: SegmentId, segment_writer: Box<dyn WalSegmentWriter>) -> Self {
Self {
buffered_data: Default::default(),
segment_writer,
segment_id,
segment_size: 0,
}
}
pub fn segment_id(&self) -> SegmentId {
self.segment_id
}
pub fn write_batch(&mut self, write_batch: Vec<WalOp>) -> wal::Result<SequenceNumber> {
self.segment_writer.write_batch(write_batch)
}
pub fn table_buffer(&self, db_name: &str, table_name: &str) -> Option<TableBuffer> {
self.buffered_data
.get(db_name)
.and_then(|db_buffer| db_buffer.table_buffers.get(table_name).cloned())
}
/// Adds the batch into the in memory buffer. Returns the number of rows in the segment after the write.
pub(crate) fn buffer_writes(&mut self, write_batch: WriteBatch) -> Result<usize> {
for (db_name, db_batch) in write_batch.database_batches {
let db_buffer = self.buffered_data.entry(db_name.to_string()).or_default();
for (table_name, table_batch) in db_batch.table_batches {
let table_buffer = db_buffer.table_buffers.entry(table_name).or_default();
for (partition_key, partition_batch) in table_batch.partition_batches {
let partition_buffer = table_buffer
.partition_buffers
.entry(partition_key)
.or_default();
// TODO: for now we'll just have the number of rows represent the segment size. The entire
// buffer is going to get refactored to use different structures, so this will change.
self.segment_size += partition_batch.rows.len();
partition_buffer.add_rows(partition_batch.rows);
}
}
}
Ok(self.segment_size)
}
}
#[derive(Debug, Default)]
pub(crate) struct WriteBatch {
database_batches: HashMap<NamespaceName<'static>, DatabaseBatch>,
}
impl WriteBatch {
pub(crate) fn add_db_write(
&mut self,
db_name: NamespaceName<'static>,
table_batches: HashMap<String, TableBatch>,
) {
let db_batch = self.database_batches.entry(db_name).or_default();
db_batch.add_table_batches(table_batches);
}
}
#[derive(Debug, Default)]
struct DatabaseBatch {
table_batches: HashMap<String, TableBatch>,
}
impl DatabaseBatch {
fn add_table_batches(&mut self, table_batches: HashMap<String, TableBatch>) {
for (table_name, table_batch) in table_batches {
let write_table_batch = self.table_batches.entry(table_name).or_default();
for (partition_key, partition_batch) in table_batch.partition_batches {
let write_partition_batch = write_table_batch
.partition_batches
.entry(partition_key)
.or_default();
write_partition_batch.rows.extend(partition_batch.rows);
}
}
}
}
pub struct BufferedWrite {
pub wal_op: WalOp,
pub database_write: DatabaseWrite,
pub response_tx: oneshot::Sender<BufferedWriteResult>,
}
pub struct DatabaseWrite {
pub(crate) db_name: NamespaceName<'static>,
pub(crate) table_batches: HashMap<String, TableBatch>,
}
impl DatabaseWrite {
pub fn new(
db_name: NamespaceName<'static>,
table_batches: HashMap<String, TableBatch>,
) -> Self {
Self {
db_name,
table_batches,
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct WriteSummary {
pub segment_id: SegmentId,
pub sequence_number: SequenceNumber,
pub buffer_size: usize,
}
#[derive(Debug, Default)]
struct DatabaseBuffer {
table_buffers: HashMap<String, TableBuffer>,
}
#[derive(Debug, Default, Clone)]
pub struct TableBuffer {
pub partition_buffers: HashMap<String, PartitionBuffer>,
}
impl TableBuffer {
#[allow(dead_code)]
pub fn partition_buffer(&self, partition_key: &str) -> Option<&PartitionBuffer> {
self.partition_buffers.get(partition_key)
}
}
#[derive(Debug, Clone)]
pub struct PartitionBuffer {
rows: Vec<Row>,
timestamp_min: i64,
timestamp_max: i64,
}
impl Default for PartitionBuffer {
fn default() -> Self {
Self {
rows: Vec::new(),
timestamp_min: i64::MAX,
timestamp_max: i64::MIN,
}
}
}
impl PartitionBuffer {
pub fn add_rows(&mut self, rows: Vec<Row>) {
for row in rows {
self.timestamp_min = self.timestamp_min.min(row.time);
self.timestamp_max = self.timestamp_max.max(row.time);
self.rows.push(row);
}
}
pub fn timestamp_min_max(&self) -> TimestampMinMax {
TimestampMinMax {
min: self.timestamp_min,
max: self.timestamp_max,
}
}
pub fn row_count(&self) -> usize {
self.rows.len()
}
pub fn rows_to_record_batch(
&self,
schema: &Schema,
column_types: &BTreeMap<String, i16>,
) -> RecordBatch {
let row_count = self.rows.len();
let mut columns = BTreeMap::new();
for (name, column_type) in column_types {
match ColumnType::try_from(*column_type).unwrap() {
ColumnType::Bool => columns.insert(
name,
Builder::Bool(BooleanBuilder::with_capacity(row_count)),
),
ColumnType::F64 => {
columns.insert(name, Builder::F64(Float64Builder::with_capacity(row_count)))
}
ColumnType::I64 => {
columns.insert(name, Builder::I64(Int64Builder::with_capacity(row_count)))
}
ColumnType::U64 => {
columns.insert(name, Builder::U64(UInt64Builder::with_capacity(row_count)))
}
ColumnType::String => columns.insert(name, Builder::String(StringBuilder::new())),
ColumnType::Tag => {
columns.insert(name, Builder::Tag(StringDictionaryBuilder::new()))
}
ColumnType::Time => columns.insert(
name,
Builder::Time(TimestampNanosecondBuilder::with_capacity(row_count)),
),
};
}
for r in &self.rows {
let mut value_added = HashSet::with_capacity(r.fields.len());
for f in &r.fields {
let builder = columns.get_mut(&f.name).unwrap();
match (&f.value, builder) {
(FieldData::Timestamp(v), Builder::Time(b)) => b.append_value(*v),
(FieldData::Tag(v), Builder::Tag(b)) => {
b.append(v).unwrap();
}
(FieldData::String(v), Builder::String(b)) => b.append_value(v),
(FieldData::Integer(v), Builder::I64(b)) => b.append_value(*v),
(FieldData::UInteger(v), Builder::U64(b)) => b.append_value(*v),
(FieldData::Float(v), Builder::F64(b)) => b.append_value(*v),
(FieldData::Boolean(v), Builder::Bool(b)) => b.append_value(*v),
_ => panic!("unexpected field type"),
}
value_added.insert(&f.name);
}
for (name, builder) in &mut columns {
if !value_added.contains(name) {
match builder {
Builder::Bool(b) => b.append_null(),
Builder::F64(b) => b.append_null(),
Builder::I64(b) => b.append_null(),
Builder::U64(b) => b.append_null(),
Builder::String(b) => b.append_null(),
Builder::Tag(b) => b.append_null(),
Builder::Time(b) => b.append_null(),
}
}
}
}
// ensure the order of the columns matches their order in the Arrow schema definition
let mut cols = Vec::with_capacity(columns.len());
let schema = schema.as_arrow();
for f in &schema.fields {
cols.push(columns.remove(f.name()).unwrap().into_arrow());
}
RecordBatch::try_new(schema, cols).unwrap()
}
}
enum Builder {
Bool(BooleanBuilder),
I64(Int64Builder),
F64(Float64Builder),
U64(UInt64Builder),
String(StringBuilder),
Tag(StringDictionaryBuilder<Int32Type>),
Time(TimestampNanosecondBuilder),
}
impl Builder {
fn into_arrow(self) -> ArrayRef {
match self {
Self::Bool(mut b) => Arc::new(b.finish()),
Self::I64(mut b) => Arc::new(b.finish()),
Self::F64(mut b) => Arc::new(b.finish()),
Self::U64(mut b) => Arc::new(b.finish()),
Self::String(mut b) => Arc::new(b.finish()),
Self::Tag(mut b) => Arc::new(b.finish()),
Self::Time(mut b) => Arc::new(b.finish()),
}
}
}
#[derive(Debug)]
pub struct ClosedBufferSegment {}
#[cfg(test)]
mod tests {
use super::*;
use crate::wal::WalSegmentWriterNoopImpl;
use crate::write_buffer::tests::lp_to_table_batches;
#[test]
fn buffers_rows() {
let mut open_segment = OpenBufferSegment::new(
SegmentId::new(0),
Box::new(WalSegmentWriterNoopImpl::new(SegmentId::new(0))),
);
let db_name: NamespaceName<'static> = NamespaceName::new("db1").unwrap();
let batches =
lp_to_table_batches("cpu,tag1=cupcakes bar=1 10\nmem,tag2=snakes bar=2 20", 10);
let mut write_batch = WriteBatch::default();
write_batch.add_db_write(db_name.clone(), batches);
open_segment.buffer_writes(write_batch).unwrap();
let batches = lp_to_table_batches("cpu,tag1=cupcakes bar=2 30", 10);
let mut write_batch = WriteBatch::default();
write_batch.add_db_write(db_name.clone(), batches);
open_segment.buffer_writes(write_batch).unwrap();
let cpu_table = open_segment.table_buffer(&db_name, "cpu").unwrap();
let cpu_partition = cpu_table.partition_buffers.get("1970-01-01").unwrap();
assert_eq!(cpu_partition.rows.len(), 2);
assert_eq!(cpu_partition.timestamp_min, 10);
assert_eq!(cpu_partition.timestamp_max, 30);
let mem_table = open_segment.table_buffer(&db_name, "mem").unwrap();
let mem_partition = mem_table.partition_buffers.get("1970-01-01").unwrap();
assert_eq!(mem_partition.rows.len(), 1);
assert_eq!(mem_partition.timestamp_min, 20);
assert_eq!(mem_partition.timestamp_max, 20);
}
}

View File

@ -0,0 +1,251 @@
//! Buffers writes and flushes them to the configured wal
use crate::write_buffer::buffer_segment::{BufferedWrite, DatabaseWrite, WriteBatch, WriteSummary};
use crate::write_buffer::{Error, SegmentState, TableBatch};
use crate::{wal, SequenceNumber, WalOp};
use crossbeam_channel::{bounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use data_types::NamespaceName;
use observability_deps::tracing::debug;
use parking_lot::{Mutex, RwLock};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::select;
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::MissedTickBehavior;
// Duration to buffer writes before flushing them to the wal
const BUFFER_FLUSH_INTERVAL: Duration = Duration::from_millis(10);
// The maximum number of buffered writes that can be queued up before backpressure is applied
const BUFFER_CHANNEL_LIMIT: usize = 10_000;
// buffered writes should only fail if the underlying WAL throws an error. They are validated before they
// are buffered. If there is an error, it'll be here
#[derive(Debug, Clone)]
pub enum BufferedWriteResult {
Success(WriteSummary),
Error(String),
}
/// The WriteBufferFlusher buffers writes and flushes them to the configured wal. The wal IO is done in a native
/// thread rather than a tokio task to avoid blocking the tokio runtime. As referenced in this post, continuous
/// long-running IO threads should be off the tokio runtime: `<https://ryhl.io/blog/async-what-is-blocking/>`.
#[derive(Debug)]
pub struct WriteBufferFlusher {
join_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
wal_io_handle: Mutex<Option<std::thread::JoinHandle<()>>>,
#[allow(dead_code)]
shutdown_tx: watch::Sender<()>,
buffer_tx: mpsc::Sender<BufferedWrite>,
}
impl WriteBufferFlusher {
pub fn new(segment_state: Arc<RwLock<SegmentState>>) -> Self {
let (shutdown_tx, shutdown_rx) = watch::channel(());
let (buffer_tx, buffer_rx) = mpsc::channel(BUFFER_CHANNEL_LIMIT);
let (io_flush_tx, io_flush_rx) = bounded(1);
let (io_flush_notify_tx, io_flush_notify_rx) = bounded(1);
let flusher = Self {
join_handle: Default::default(),
wal_io_handle: Default::default(),
shutdown_tx,
buffer_tx,
};
let wal_op_buffer_segment_state = Arc::clone(&segment_state);
*flusher.wal_io_handle.lock() = Some(
std::thread::Builder::new()
.name("write buffer io flusher".to_string())
.spawn(move || {
run_io_flush(segment_state, io_flush_rx, io_flush_notify_tx);
})
.expect("failed to spawn write buffer io flusher thread"),
);
*flusher.join_handle.lock() = Some(tokio::task::spawn(async move {
run_wal_op_buffer(
wal_op_buffer_segment_state,
buffer_rx,
io_flush_tx,
io_flush_notify_rx,
shutdown_rx,
)
.await;
}));
flusher
}
pub async fn write_to_open_segment(
&self,
db_name: NamespaceName<'static>,
table_batches: HashMap<String, TableBatch>,
wal_op: WalOp,
) -> crate::write_buffer::Result<WriteSummary> {
let (response_tx, response_rx) = oneshot::channel();
self.buffer_tx
.send(BufferedWrite {
wal_op,
database_write: DatabaseWrite::new(db_name, table_batches),
response_tx,
})
.await
.expect("wal op buffer thread is dead");
let summary = response_rx.await.expect("wal op buffer thread is dead");
match summary {
BufferedWriteResult::Success(summary) => Ok(summary),
BufferedWriteResult::Error(e) => Err(Error::BufferSegmentError(e)),
}
}
}
async fn run_wal_op_buffer(
segment_state: Arc<RwLock<SegmentState>>,
mut buffer_rx: mpsc::Receiver<BufferedWrite>,
io_flush_tx: CrossbeamSender<Vec<WalOp>>,
io_flush_notify_rx: CrossbeamReceiver<wal::Result<SequenceNumber>>,
mut shutdown: watch::Receiver<()>,
) {
let mut ops = Vec::new();
let mut write_batch = crate::write_buffer::buffer_segment::WriteBatch::default();
let mut notifies = Vec::new();
let mut interval = tokio::time::interval(BUFFER_FLUSH_INTERVAL);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
// select on either buffering an op, ticking the flush interval, or shutting down
select! {
Some(buffered_write) = buffer_rx.recv() => {
ops.push(buffered_write.wal_op);
write_batch.add_db_write(buffered_write.database_write.db_name, buffered_write.database_write.table_batches);
notifies.push(buffered_write.response_tx);
},
_ = interval.tick() => {
if ops.is_empty() {
continue;
}
let ops_written = ops.len();
// send ops into IO flush channel and wait for response
io_flush_tx.send(ops).expect("wal io thread is dead");
let res = match io_flush_notify_rx.recv().expect("wal io thread is dead") {
Ok(sequence_number) => {
let open_segment = &mut segment_state.write().open_segment;
match open_segment.buffer_writes(write_batch) {
Ok(buffer_size) => BufferedWriteResult::Success(WriteSummary {
segment_id: open_segment.segment_id(),
sequence_number,
buffer_size,
}),
Err(e) => BufferedWriteResult::Error(e.to_string()),
}
},
Err(e) => BufferedWriteResult::Error(e.to_string()),
};
// notify the watchers of the write response
for response_tx in notifies {
let _ = response_tx.send(res.clone());
}
// reset the buffers
ops = Vec::with_capacity(ops_written);
write_batch = WriteBatch::default();
notifies = Vec::with_capacity(ops_written);
},
_ = shutdown.changed() => {
// shutdown has been requested
debug!("stopping wal op buffer thread");
return;
}
}
}
}
fn run_io_flush(
segment_state: Arc<RwLock<SegmentState>>,
buffer_rx: CrossbeamReceiver<Vec<WalOp>>,
buffer_notify: CrossbeamSender<wal::Result<SequenceNumber>>,
) {
loop {
let batch = match buffer_rx.recv() {
Ok(batch) => batch,
Err(_) => {
// the buffer channel has closed, it's shutdown
debug!("stopping wal io thread");
return;
}
};
let mut state = segment_state.write();
let res = state.open_segment.write_batch(batch);
buffer_notify.send(res).expect("buffer flusher is dead");
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::wal::WalSegmentWriterNoopImpl;
use crate::write_buffer::buffer_segment::OpenBufferSegment;
use crate::write_buffer::tests::lp_to_table_batches;
use crate::{LpWriteOp, SegmentId};
#[tokio::test]
async fn flushes_to_open_segment() {
let segment_id = SegmentId::new(3);
let open_segment = OpenBufferSegment::new(
segment_id,
Box::new(WalSegmentWriterNoopImpl::new(segment_id)),
);
let segment_state = Arc::new(RwLock::new(SegmentState::new(open_segment)));
let flusher = WriteBufferFlusher::new(Arc::clone(&segment_state));
let db_name = NamespaceName::new("db1").unwrap();
let wal_op = WalOp::LpWrite(LpWriteOp {
db_name: db_name.to_string(),
lp: "cpu bar=1 10".to_string(),
default_time: 0,
});
let data = lp_to_table_batches("cpu bar=1 10", 0);
let write_summary = flusher
.write_to_open_segment(db_name.clone(), data, wal_op)
.await
.unwrap();
assert_eq!(write_summary.segment_id, segment_id);
assert_eq!(write_summary.sequence_number, SequenceNumber::new(1));
let wal_op = WalOp::LpWrite(LpWriteOp {
db_name: db_name.to_string(),
lp: "cpu bar=1 20".to_string(),
default_time: 0,
});
let data = lp_to_table_batches("cpu bar=1 20", 0);
let write_summary = flusher
.write_to_open_segment(db_name.clone(), data, wal_op)
.await
.unwrap();
assert_eq!(write_summary.sequence_number, SequenceNumber::new(2));
let state = segment_state.read();
assert_eq!(state.open_segment.segment_id(), segment_id);
let table_buffer = state
.open_segment
.table_buffer(db_name.as_str(), "cpu")
.unwrap();
let partition_buffer = table_buffer.partition_buffer("1970-01-01").unwrap();
assert_eq!(partition_buffer.row_count(), 2);
}
}

View File

@ -1,39 +1,36 @@
//! Implementation of an in-memory buffer for writes
//! Implementation of an in-memory buffer for writes that persists data into a wal if it is configured.
use crate::catalog::{Catalog, DatabaseSchema, TableDefinition};
mod buffer_segment;
mod flusher;
use crate::catalog::{Catalog, DatabaseSchema, TableDefinition, TIME_COLUMN_NAME};
use crate::wal::WalSegmentWriterNoopImpl;
use crate::write_buffer::buffer_segment::{ClosedBufferSegment, OpenBufferSegment, TableBuffer};
use crate::write_buffer::flusher::WriteBufferFlusher;
use crate::{
BufferSegment, BufferedWriteRequest, Bufferer, ChunkContainer, SegmentId, Wal, WriteBuffer,
};
use arrow::array::ArrayRef;
use arrow::{
array::{
BooleanBuilder, Float64Builder, Int64Builder, StringBuilder, StringDictionaryBuilder,
TimestampNanosecondBuilder, UInt64Builder,
},
datatypes::Int32Type,
record_batch::RecordBatch,
BufferSegment, BufferedWriteRequest, Bufferer, ChunkContainer, LpWriteOp, SegmentId, Wal,
WalOp, WriteBuffer,
};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use chrono::{TimeZone, Utc};
use data_types::{
column_type_from_field, ChunkId, ChunkOrder, ColumnType, NamespaceName, PartitionKey, TableId,
TimestampMinMax, TransitionPartitionId,
TransitionPartitionId,
};
use datafusion::common::{DataFusionError, Statistics};
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::Expr;
use influxdb_line_protocol::{parse_lines, FieldValue, ParsedLine};
use iox_catalog::constants::TIME_COLUMN;
use iox_query::chunk_statistics::{create_chunk_statistics, ColumnRange};
use iox_query::chunk_statistics::create_chunk_statistics;
use iox_query::{QueryChunk, QueryChunkData};
use observability_deps::tracing::{debug, info};
use parking_lot::RwLock;
use schema::sort::SortKey;
use schema::Schema;
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use thiserror::Error;
@ -48,6 +45,15 @@ pub enum Error {
existing: ColumnType,
new: ColumnType,
},
#[error("catalog update erorr {0}")]
CatalogUpdateError(#[from] crate::catalog::Error),
#[error("error from wal: {0}")]
WalError(#[from] crate::wal::Error),
#[error("error from buffer segment: {0}")]
BufferSegmentError(String),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -62,21 +68,53 @@ pub struct WriteRequest<'a> {
#[derive(Debug)]
pub struct WriteBufferImpl<W> {
catalog: Arc<Catalog>,
buffered_data: RwLock<HashMap<String, DatabaseBuffer>>,
segment_state: Arc<RwLock<SegmentState>>,
#[allow(dead_code)]
wal: Option<Arc<W>>,
write_buffer_flusher: WriteBufferFlusher,
}
#[derive(Debug)]
struct SegmentState {
open_segment: OpenBufferSegment,
#[allow(dead_code)]
persisting_segments: Vec<ClosedBufferSegment>,
}
impl SegmentState {
pub fn new(open_segment: OpenBufferSegment) -> Self {
Self {
open_segment,
persisting_segments: vec![],
}
}
}
impl<W: Wal> WriteBufferImpl<W> {
pub fn new(catalog: Arc<Catalog>, wal: Option<Arc<W>>) -> Self {
Self {
pub fn new(
catalog: Arc<Catalog>,
wal: Option<Arc<W>>,
next_segment_id: SegmentId,
) -> Result<Self> {
let segment_writer = wal
.as_ref()
.map(|w| w.open_segment_writer(next_segment_id))
.transpose()?
.unwrap_or_else(|| Box::new(WalSegmentWriterNoopImpl::new(next_segment_id)));
let open_segment = OpenBufferSegment::new(next_segment_id, segment_writer);
let segment_state = Arc::new(RwLock::new(SegmentState::new(open_segment)));
let write_buffer_flusher = WriteBufferFlusher::new(Arc::clone(&segment_state));
Ok(Self {
catalog,
buffered_data: RwLock::new(HashMap::new()),
segment_state,
wal,
}
write_buffer_flusher,
})
}
// TODO: write into segments and wal
async fn write_lp(
&self,
db_name: NamespaceName<'static>,
@ -84,35 +122,19 @@ impl<W: Wal> WriteBufferImpl<W> {
default_time: i64,
) -> Result<BufferedWriteRequest> {
debug!("write_lp to {} in writebuffer", db_name);
let (sequence, db) = self.catalog.db_or_create(db_name.as_str());
let result = parse_validate_and_update_schema(
lp,
&db,
&Partitioner::new_per_day_partitioner(),
let result = self.parse_validate_and_update_schema(db_name.clone(), lp, default_time)?;
let wal_op = WalOp::LpWrite(LpWriteOp {
db_name: db_name.to_string(),
lp: lp.to_string(),
default_time,
)?;
});
if let Some(schema) = result.schema {
debug!("replacing schema for {:?}", schema);
self.catalog
.replace_database(sequence, Arc::new(schema))
.unwrap();
}
let mut buffered_data = self.buffered_data.write();
let db_buffer = buffered_data
.entry(db_name.as_str().to_string())
.or_default();
for (table_name, table_batch) in result.table_batches {
let table_buffer = db_buffer.table_buffers.entry(table_name).or_default();
for (partition_key, partition_batch) in table_batch.partition_batches {
let partition_buffer = table_buffer
.partition_buffers
.entry(partition_key)
.or_default();
partition_buffer.rows.extend(partition_batch.rows);
}
}
let write_summary = self
.write_buffer_flusher
.write_to_open_segment(db_name.clone(), result.table_batches, wal_op)
.await?;
Ok(BufferedWriteRequest {
db_name,
@ -120,11 +142,35 @@ impl<W: Wal> WriteBufferImpl<W> {
line_count: result.line_count,
field_count: result.field_count,
tag_count: result.tag_count,
total_buffer_memory_used: 0,
segment_id: SegmentId(0),
total_buffer_memory_used: write_summary.buffer_size,
segment_id: write_summary.segment_id,
sequence_number: write_summary.sequence_number,
})
}
fn parse_validate_and_update_schema(
&self,
db_name: NamespaceName<'static>,
lp: &str,
default_time: i64,
) -> Result<ValidationResult> {
let (sequence, db) = self.catalog.db_or_create(db_name.as_str());
let mut result = parse_validate_and_update_schema(
lp,
&db,
&Partitioner::new_per_day_partitioner(),
default_time,
)?;
if let Some(schema) = result.schema.take() {
debug!("replacing schema for {:?}", schema);
self.catalog.replace_database(sequence, Arc::new(schema))?;
}
Ok(result)
}
fn get_table_chunks(
&self,
database_name: &str,
@ -144,15 +190,11 @@ impl<W: Wal> WriteBufferImpl<W> {
for (partition_key, partition_buffer) in table_buffer.partition_buffers {
let partition_key: PartitionKey = partition_key.into();
let batch = partition_buffer.rows_to_record_batch(&schema, table.columns());
let column_ranges = Arc::new(partition_buffer.column_ranges);
let batch_stats = create_chunk_statistics(
Some(partition_buffer.rows.len()),
Some(partition_buffer.row_count()),
&schema,
Some(TimestampMinMax {
min: partition_buffer.timestamp_min,
max: partition_buffer.timestamp_max,
}),
Some(&column_ranges),
Some(partition_buffer.timestamp_min_max()),
None,
);
let chunk = BufferChunk {
@ -172,9 +214,26 @@ impl<W: Wal> WriteBufferImpl<W> {
}
fn clone_table_buffer(&self, database_name: &str, table_name: &str) -> Option<TableBuffer> {
let binding = self.buffered_data.read();
let table_buffer = binding.get(database_name)?.table_buffers.get(table_name)?;
Some(table_buffer.clone())
let state = self.segment_state.read();
state.open_segment.table_buffer(database_name, table_name)
}
#[cfg(test)]
fn get_table_record_batches(&self, datbase_name: &str, table_name: &str) -> Vec<RecordBatch> {
let db_schema = self.catalog.db_schema(datbase_name).unwrap();
let table = db_schema.tables.get(table_name).unwrap();
let schema = table.schema.as_ref().cloned().unwrap();
let table_buffer = self.clone_table_buffer(datbase_name, table_name).unwrap();
let mut batches = Vec::with_capacity(table_buffer.partition_buffers.len());
for (_, partition_buffer) in table_buffer.partition_buffers {
let batch = partition_buffer.rows_to_record_batch(&schema, table.columns());
batches.push(batch);
}
batches
}
}
@ -221,128 +280,6 @@ impl<W: Wal> ChunkContainer for WriteBufferImpl<W> {
impl<W: Wal> WriteBuffer for WriteBufferImpl<W> {}
#[derive(Debug, Default)]
struct DatabaseBuffer {
table_buffers: HashMap<String, TableBuffer>,
}
#[derive(Debug, Default, Clone)]
struct TableBuffer {
partition_buffers: HashMap<String, PartitionBuffer>,
}
#[derive(Debug, Default, Clone)]
struct PartitionBuffer {
rows: Vec<Row>,
column_ranges: HashMap<Arc<str>, ColumnRange>,
timestamp_min: i64,
timestamp_max: i64,
}
impl PartitionBuffer {
fn rows_to_record_batch(
&self,
schema: &Schema,
column_types: &BTreeMap<String, i16>,
) -> RecordBatch {
let row_count = self.rows.len();
let mut columns = BTreeMap::new();
for (name, column_type) in column_types {
match ColumnType::try_from(*column_type).unwrap() {
ColumnType::Bool => columns.insert(
name,
Builder::Bool(BooleanBuilder::with_capacity(row_count)),
),
ColumnType::F64 => {
columns.insert(name, Builder::F64(Float64Builder::with_capacity(row_count)))
}
ColumnType::I64 => {
columns.insert(name, Builder::I64(Int64Builder::with_capacity(row_count)))
}
ColumnType::U64 => {
columns.insert(name, Builder::U64(UInt64Builder::with_capacity(row_count)))
}
ColumnType::String => columns.insert(name, Builder::String(StringBuilder::new())),
ColumnType::Tag => {
columns.insert(name, Builder::Tag(StringDictionaryBuilder::new()))
}
ColumnType::Time => columns.insert(
name,
Builder::Time(TimestampNanosecondBuilder::with_capacity(row_count)),
),
};
}
for r in &self.rows {
let mut value_added = HashSet::with_capacity(r.fields.len());
for f in &r.fields {
let builder = columns.get_mut(&f.name).unwrap();
match (&f.value, builder) {
(FieldData::Timestamp(v), Builder::Time(b)) => b.append_value(*v),
(FieldData::Tag(v), Builder::Tag(b)) => {
b.append(v).unwrap();
}
(FieldData::String(v), Builder::String(b)) => b.append_value(v),
(FieldData::Integer(v), Builder::I64(b)) => b.append_value(*v),
(FieldData::UInteger(v), Builder::U64(b)) => b.append_value(*v),
(FieldData::Float(v), Builder::F64(b)) => b.append_value(*v),
(FieldData::Boolean(v), Builder::Bool(b)) => b.append_value(*v),
_ => panic!("unexpected field type"),
}
value_added.insert(&f.name);
}
for (name, builder) in &mut columns {
if !value_added.contains(name) {
match builder {
Builder::Bool(b) => b.append_null(),
Builder::F64(b) => b.append_null(),
Builder::I64(b) => b.append_null(),
Builder::U64(b) => b.append_null(),
Builder::String(b) => b.append_null(),
Builder::Tag(b) => b.append_null(),
Builder::Time(b) => b.append_null(),
}
}
}
}
// ensure the order of the columns matches their order in the Arrow schema definition
let mut cols = Vec::with_capacity(columns.len());
let schema = schema.as_arrow();
for f in &schema.fields {
cols.push(columns.remove(f.name()).unwrap().into_arrow());
}
RecordBatch::try_new(schema, cols).unwrap()
}
}
enum Builder {
Bool(BooleanBuilder),
I64(Int64Builder),
F64(Float64Builder),
U64(UInt64Builder),
String(StringBuilder),
Tag(StringDictionaryBuilder<Int32Type>),
Time(TimestampNanosecondBuilder),
}
impl Builder {
fn into_arrow(self) -> ArrayRef {
match self {
Self::Bool(mut b) => Arc::new(b.finish()),
Self::I64(mut b) => Arc::new(b.finish()),
Self::F64(mut b) => Arc::new(b.finish()),
Self::U64(mut b) => Arc::new(b.finish()),
Self::String(mut b) => Arc::new(b.finish()),
Self::Tag(mut b) => Arc::new(b.finish()),
Self::Time(mut b) => Arc::new(b.finish()),
}
}
}
#[derive(Debug)]
pub struct BufferChunk {
batches: Vec<RecordBatch>,
@ -402,6 +339,7 @@ impl QueryChunk for BufferChunk {
self
}
}
const YEAR_MONTH_DAY_TIME_FORMAT: &str = "%Y-%m-%d";
/// Takes &str of line protocol, parses lines, validates the schema, and inserts new columns
@ -519,7 +457,7 @@ fn validate_and_convert_parsed_line(
columns.insert(field_name.to_string(), column_type_from_field(value) as i16);
}
columns.insert(TIME_COLUMN.to_string(), ColumnType::Time as i16);
columns.insert(TIME_COLUMN_NAME.to_string(), ColumnType::Time as i16);
let table = TableDefinition::new(table_name, columns);
@ -567,7 +505,7 @@ fn validate_and_convert_parsed_line(
// set the time value
let time_value = line.timestamp.unwrap_or(default_time);
values.push(Field {
name: TIME_COLUMN.to_string(),
name: TIME_COLUMN_NAME.to_string(),
value: FieldData::Timestamp(time_value),
});
@ -586,7 +524,7 @@ fn validate_and_convert_parsed_line(
Ok(())
}
#[derive(Debug, Default, Serialize, Deserialize)]
#[derive(Debug, Default)]
pub(crate) struct TableBatch {
#[allow(dead_code)]
pub(crate) name: String,
@ -594,24 +532,24 @@ pub(crate) struct TableBatch {
pub(crate) partition_batches: HashMap<String, PartitionBatch>,
}
#[derive(Debug, Default, Serialize, Deserialize)]
#[derive(Debug, Default)]
pub(crate) struct PartitionBatch {
pub(crate) rows: Vec<Row>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug)]
pub(crate) struct Row {
pub(crate) time: i64,
pub(crate) fields: Vec<Field>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug)]
pub(crate) struct Field {
pub(crate) name: String,
pub(crate) value: FieldData,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug)]
pub(crate) enum FieldData {
Timestamp(i64),
Tag(String),
@ -672,7 +610,9 @@ impl Partitioner {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use crate::wal::WalImpl;
use crate::{SequenceNumber, WalOpBatch};
use arrow_util::assert_batches_eq;
#[test]
fn parse_lp_into_buffer() {
@ -688,4 +628,57 @@ mod tests {
assert_eq!(db.tables.get("cpu").unwrap().columns().len(), 3);
assert_eq!(db.tables.get("foo").unwrap().columns().len(), 2);
}
#[tokio::test]
async fn buffers_and_persists_to_wal() {
let dir = test_helpers::tmp_dir().unwrap().into_path();
let wal = WalImpl::new(dir.clone()).unwrap();
let catalog = Arc::new(Catalog::new());
let write_buffer =
WriteBufferImpl::new(catalog, Some(Arc::new(wal)), SegmentId::new(0)).unwrap();
let summary = write_buffer
.write_lp(NamespaceName::new("foo").unwrap(), "cpu bar=1 10", 123)
.await
.unwrap();
assert_eq!(summary.line_count, 1);
assert_eq!(summary.field_count, 1);
assert_eq!(summary.tag_count, 0);
assert_eq!(summary.total_buffer_memory_used, 1);
assert_eq!(summary.segment_id, SegmentId::new(0));
assert_eq!(summary.sequence_number, SequenceNumber::new(1));
// ensure the data is in the buffer
let actual = write_buffer.get_table_record_batches("foo", "cpu");
let expected = vec![
"+-----+--------------------------------+",
"| bar | time |",
"+-----+--------------------------------+",
"| 1.0 | 1970-01-01T00:00:00.000000010Z |",
"+-----+--------------------------------+",
];
assert_batches_eq!(&expected, &actual);
// ensure the data is in the wal
let wal = WalImpl::new(dir).unwrap();
let mut reader = wal.open_segment_reader(SegmentId::new(0)).unwrap();
let batch = reader.next_batch().unwrap().unwrap();
let expected_batch = WalOpBatch {
sequence_number: SequenceNumber::new(1),
ops: vec![WalOp::LpWrite(LpWriteOp {
db_name: "foo".to_string(),
lp: "cpu bar=1 10".to_string(),
default_time: 123,
})],
};
assert_eq!(batch, expected_batch);
}
pub(crate) fn lp_to_table_batches(lp: &str, default_time: i64) -> HashMap<String, TableBatch> {
let db = Arc::new(DatabaseSchema::new("foo"));
let partitioner = Partitioner::new_per_day_partitioner();
let result = parse_validate_and_update_schema(lp, &db, &partitioner, default_time).unwrap();
result.table_batches
}
}