fix: Remove write buffer

pull/24376/head
Carol (Nichols || Goulding) 2021-05-21 14:30:56 -04:00
parent 42f26b609b
commit f4a9a5ae56
30 changed files with 25 additions and 3571 deletions

View File

@ -189,10 +189,9 @@ cargo clippy --all-targets --workspace -- -D warnings
## Upgrading the `flatbuffers` crate
IOx uses Flatbuffers for its write buffer. The structure is defined in
[`generated_types/protos/write_buffer.fbs`]. We have then used the `flatc` Flatbuffers compiler to
generate the corresponding Rust code in [`generated_types/src/write_buffer_generated.rs`], which
is checked in to the repository.
IOx uses Flatbuffers for some of its messages. The structure is defined in [`entry/src/entry.fbs`].
We have then used the `flatc` Flatbuffers compiler to generate the corresponding Rust code in
[`entry/src/entry_generated.rs`], which is checked in to the repository.
The checked-in code is compatible with the `flatbuffers` crate version in the `Cargo.lock` file. If
upgrading the version of the `flatbuffers` crate that IOx depends on, the generated code will need
@ -200,6 +199,6 @@ to be updated as well.
Instructions for updating the generated code are in [`docs/regenerating_flatbuffers.md`].
[`generated_types/protos/write_buffer.fbs`]: generated_types/protos/write_buffer.fbs
[`generated_types/src/write_buffer_generated.rs`]: generated_types/src/write_buffer_generated.rs
[`entry/src/entry.fbs`]: entry/src/entry.fbs
[`entry/src/entry_generated.rs`]: entry/src/entry_generated.rs
[`docs/regenerating_flatbuffers.md`]: docs/regenerating_flatbuffers.md

22
Cargo.lock generated
View File

@ -1,5 +1,7 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "Inflector"
version = "0.11.4"
@ -1063,7 +1065,6 @@ name = "entry"
version = "0.1.0"
dependencies = [
"chrono",
"criterion",
"data_types",
"flatbuffers",
"influxdb_line_protocol",
@ -4862,25 +4863,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "write_buffer"
version = "0.1.0"
dependencies = [
"byteorder",
"crc32fast",
"futures",
"itertools 0.9.0",
"observability_deps",
"once_cell",
"regex",
"serde",
"serde_json",
"snafu",
"snap",
"test_helpers",
"tokio",
]
[[package]]
name = "xml-rs"
version = "0.8.3"

View File

@ -35,7 +35,6 @@ members = [
"server_benchmarks",
"test_helpers",
"tracker",
"write_buffer",
]
[profile.release]

View File

@ -40,10 +40,6 @@ pub struct DatabaseRules {
/// db
pub partition_template: PartitionTemplate,
/// When set, this will buffer writes in memory based on the
/// configuration.
pub write_buffer_config: Option<WriteBufferConfig>,
/// Configure how data flows through the system
pub lifecycle_rules: LifecycleRules,
@ -81,7 +77,6 @@ impl DatabaseRules {
Self {
name,
partition_template: Default::default(),
write_buffer_config: None,
lifecycle_rules: Default::default(),
routing_rules: None,
worker_cleanup_avg_sleep: Duration::from_secs(500),
@ -235,58 +230,6 @@ pub enum ColumnValue {
Max,
}
/// `WriteBufferConfig` defines the configuration for buffering data from writes
/// in memory. This buffer is used for asynchronous replication and to collect
/// segments before sending them to object storage.
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct WriteBufferConfig {
/// The size the Write Buffer should be limited to. Once the buffer gets to
/// this size, it will drop old segments to remain below this size, but
/// still try to hold as much in memory as possible while remaining
/// below this threshold
pub buffer_size: usize,
/// Write Buffer segments become read-only after crossing over this size,
/// which means that segments will always be <= this size. When old segments
/// are dropped from of memory, at least this much space will be freed from
/// the buffer.
pub segment_size: usize,
/// What should happen if a write comes in that would exceed the Write
/// Buffer size and the oldest segment that could be dropped hasn't yet been
/// persisted to object storage. If the oldest segment has been persisted,
/// then it will be dropped from the buffer so that new writes can be
/// accepted. This option is only for defining the behaviour of what happens
/// if that segment hasn't been persisted. If set to return an error, new
/// writes will be rejected until the oldest segment has been persisted so
/// that it can be cleared from memory. Alternatively, this can be set so
/// that old segments are dropped even if they haven't been persisted. This
/// setting is also useful for cases where persistence isn't being used and
/// this is only for in-memory buffering.
pub buffer_rollover: WriteBufferRollover,
/// If set to true, buffer segments will be written to object storage.
pub store_segments: bool,
/// If set, segments will be rolled over after this period of time even
/// if they haven't hit the size threshold. This allows them to be written
/// out to object storage as they must be immutable first.
pub close_segment_after: Option<std::time::Duration>,
}
/// `WriteBufferRollover` defines the behavior of what should happen if a write
/// comes in that would cause the buffer to exceed its max size AND the oldest
/// segment can't be dropped because it has not yet been persisted.
#[derive(Debug, Clone, Eq, PartialEq, Copy)]
pub enum WriteBufferRollover {
/// Drop the old segment even though it hasn't been persisted. This part of
/// the Write Buffer will be lost on this server.
DropOldSegment,
/// Drop the incoming write and fail silently. This favors making sure that
/// older Write Buffer data will be backed up.
DropIncoming,
/// Reject the incoming write and return an error. The client may retry the
/// request, which will succeed once the oldest segment has been
/// persisted to object storage.
ReturnError,
}
/// `PartitionTemplate` is used to compute the partition key of each row that
/// gets written. It can consist of the table name, a column name and its value,
/// a formatted time, or a string column and regex captures of its value. For

View File

@ -1,20 +0,0 @@
//! This module contains structs for the HTTP API
use crate::write_buffer::SegmentSummary;
use serde::{Deserialize, Serialize};
/// Query string for the Write Buffer metadata endpoint
#[derive(Debug, Clone, Serialize, Deserialize, Default, Eq, PartialEq)]
pub struct WriteBufferMetadataQuery {
#[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub offset: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub newer_than: Option<chrono::DateTime<chrono::Utc>>,
}
/// Response for the Write Buffer metadata endpoint
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct WriteBufferMetadataResponse {
pub segments: Vec<SegmentSummary>,
}

View File

@ -16,10 +16,8 @@ mod database_name;
pub use database_name::*;
pub mod database_rules;
pub mod error;
pub mod http;
pub mod job;
pub mod names;
pub mod partition_metadata;
pub mod server_id;
pub mod timestamp;
pub mod write_buffer;

View File

@ -15,10 +15,3 @@ snafu = "0.6"
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
ouroboros = "0.8.3"
internal_types = { path = "../internal_types" }
[dev-dependencies] # In alphabetical order
criterion = "0.3"
[[bench]]
name = "benchmark"
harness = false

View File

@ -1,53 +0,0 @@
use criterion::{criterion_group, criterion_main, Criterion};
use data_types::{database_rules::ShardConfig, server_id::ServerId};
use entry::{
lines_to_sharded_entries, test_helpers::partitioner, ClockValue, OwnedSequencedEntry,
SequencedEntry,
};
use std::convert::TryFrom;
static LINES: &str = include_str!("../../tests/fixtures/lineproto/prometheus.lp");
fn sequenced_entry(c: &mut Criterion) {
let mut group = c.benchmark_group("sequenced_entry_generator");
let lines = influxdb_line_protocol::parse_lines(LINES)
.collect::<Result<Vec<_>, _>>()
.unwrap();
let shard_config: Option<&ShardConfig> = None;
let default_time = 456;
let sharded_entries =
lines_to_sharded_entries(&lines, default_time, shard_config, &partitioner(1)).unwrap();
let entry = &sharded_entries.first().unwrap().entry;
let data = entry.data();
assert_eq!(
entry
.partition_writes()
.unwrap()
.first()
.unwrap()
.table_batches()
.first()
.unwrap()
.row_count(),
554
);
let clock_value = ClockValue::try_from(23).unwrap();
let server_id = ServerId::try_from(2).unwrap();
group.bench_function("new_from_entry_bytes", |b| {
b.iter(|| {
let sequenced_entry =
OwnedSequencedEntry::new_from_entry_bytes(clock_value, server_id, data).unwrap();
assert_eq!(sequenced_entry.clock_value(), clock_value);
assert_eq!(sequenced_entry.server_id(), server_id);
})
});
group.finish();
}
criterion_group!(benches, sequenced_entry);
criterion_main!(benches);

View File

@ -1,20 +1,7 @@
namespace influxdata.iox.write.v1;
// Every modification to a database is represented as an entry. These can be forwarded
// on to other IOx servers or can be wrapped with a logical clock value and server id
// for ordering in a buffer or to subscribers. An Entry is what gets sent from the
// routing layer to the write buffer layer. The write buffer layer uses seqenced entries
// to replicate to its peers and to send to downstream subscribers.
//
// Take the example of sharding, where an IOx server is configured to split an
// incoming write into shards and send the write onto other servers. The batch of line protocol
// written in a request will be split up so that each shard with data will have a
// single Entry that will be sent to it. If the server that is doing the
// sharding is not generating partition keys, the key in partition write won't be
// present. It can be generated downstream. Although it's better to have the sharding
// layer generate the partition keys while it's at the job of parsing and validating
// the line protocol. This will save the downstream stateful layers from doing
// extra work.
// on to other IOx servers or to Kafka.
// An entry can only be one of these Operation types
union Operation {
@ -121,43 +108,3 @@ table BytesValues {
table BytesValue {
data: [ubyte];
}
// The following definitions are for the write buffer and for downstream subscribers to
// the buffer.
// Segment is a collection of Entries. It is the payload of a Write Buffer
// segment file. Because Write Buffer servers can replicate data with peers,
// entries in the collection may come from different server ids. Entries within
// a segment should be ordered by ascending order by the clock value then server id.
table Segment {
// the segment number
id: uint64;
// the id of the server that persisted this segment
server_id: uint32;
// this is the clock value that the server has verified with its peers that
// entries are ordered and consistent up to. If the server has no peers this
// value will just be the clock value for the last sequenced entry in this segment.
// This value could be lower than the first clock value in the segment if a
// consistency check has not occured since this segment was started. Or, if
// a consistency check has occured in a segment following this one, but before
// this segment has been persisted, the clock value could be higher than the last
// entry in the segment. This latter condition indicates that all entries in the
// segment are properly ordered and consistent with other write buffer peers.
consistency_high_water_clock: uint64;
// the raw entry data along with their sequence numbers
entries: [SequencedEntry];
}
// SequencedEntry are what get inserted into a Write Buffer. These are
// what Write Buffer servers replicate to their peers and what go out to
// downstream subscribers of the Buffer. The clock values can be used to order
// the entries from a Buffer server. They are guaranteed to never go backwards.
table SequencedEntry {
clock_value: uint64;
server_id: uint32;
// The raw bytes for an Entry flatbuffers. Because we can't build the SequencedEntry
// Flatbuffers from the bytes of an Entry, we do this to avoid reconstructing the
// whole thing. See for the examples and a little context on it:
// https://github.com/influxdata/influxdb_iox/pull/1149
entry_bytes: [ubyte];
}

View File

@ -1,23 +1,14 @@
//! This module contains helper code for building `Entry` and `SequencedEntry`
//! from line protocol and the `DatabaseRules` configuration.
//! This module contains helper code for building `Entry` from line protocol and the
//! `DatabaseRules` configuration.
use std::sync::Arc;
use std::{
collections::BTreeMap,
convert::{TryFrom, TryInto},
fmt::Formatter,
num::NonZeroU64,
};
use std::{collections::BTreeMap, convert::TryFrom, fmt::Formatter, num::NonZeroU64};
use chrono::Utc;
use flatbuffers::{FlatBufferBuilder, Follow, ForwardsUOffset, Vector, VectorIter, WIPOffset};
use ouroboros::self_referencing;
use snafu::{OptionExt, ResultExt, Snafu};
use data_types::{
database_rules::{Error as DataError, Partitioner, ShardId, Sharder},
server_id::ServerId,
};
use data_types::database_rules::{Error as DataError, Partitioner, ShardId, Sharder};
use influxdb_line_protocol::{FieldValue, ParsedLine};
use internal_types::schema::{InfluxColumnType, InfluxFieldType, TIME_COLUMN_NAME};
@ -1199,315 +1190,6 @@ enum InnerClockValueError {
ValueMayNotBeZero,
}
pub trait SequencedEntry: Send + Sync + std::fmt::Debug {
fn partition_writes(&self) -> Option<Vec<PartitionWrite<'_>>>;
fn fb(&self) -> &entry_fb::SequencedEntry<'_>;
fn size(&self) -> usize;
fn clock_value(&self) -> ClockValue {
self.fb()
.clock_value()
.try_into()
.expect("ClockValue should have been validated when this was built from flatbuffers")
}
fn server_id(&self) -> ServerId {
self.fb()
.server_id()
.try_into()
.expect("ServerId should have been validated when this was built from flatbuffers")
}
}
#[self_referencing]
#[derive(Debug)]
pub struct OwnedSequencedEntry {
data: Vec<u8>,
#[borrows(data)]
#[covariant]
fb: entry_fb::SequencedEntry<'this>,
#[borrows(data)]
#[covariant]
entry: Option<entry_fb::Entry<'this>>,
}
impl OwnedSequencedEntry {
pub fn new_from_entry_bytes(
clock_value: ClockValue,
server_id: ServerId,
entry_bytes: &[u8],
) -> Result<Self> {
// The flatbuffer contains:
// 1xu64 -> clock_value
// 1xu32 -> server_id
// 0? -> entry (unused here)
// input -> entry_bytes
// The buffer also needs space for the flatbuffer vtable.
const OVERHEAD: usize = 4 * std::mem::size_of::<u64>();
let mut fbb = FlatBufferBuilder::new_with_capacity(entry_bytes.len() + OVERHEAD);
let entry_bytes = fbb.create_vector_direct(entry_bytes);
let sequenced_entry = entry_fb::SequencedEntry::create(
&mut fbb,
&entry_fb::SequencedEntryArgs {
clock_value: clock_value.get_u64(),
server_id: server_id.get_u32(),
entry_bytes: Some(entry_bytes),
},
);
fbb.finish(sequenced_entry, None);
let (mut data, idx) = fbb.collapse();
let sequenced_entry = Self::try_from(data.split_off(idx))
.expect("Flatbuffer data just constructed should be valid");
Ok(sequenced_entry)
}
}
impl SequencedEntry for OwnedSequencedEntry {
fn partition_writes(&self) -> Option<Vec<PartitionWrite<'_>>> {
match self.borrow_entry().as_ref() {
Some(e) => match e.operation_as_write().as_ref() {
Some(w) => w
.partition_writes()
.as_ref()
.map(|w| w.iter().map(|fb| PartitionWrite { fb }).collect::<Vec<_>>()),
None => None,
},
None => None,
}
}
/// Returns the Flatbuffers struct for the SequencedEntry
fn fb(&self) -> &entry_fb::SequencedEntry<'_> {
self.borrow_fb()
}
fn size(&self) -> usize {
self.borrow_data().len()
}
}
#[derive(Debug, Snafu)]
pub enum SequencedEntryError {
#[snafu(display("{}", source))]
InvalidFlatbuffer {
source: flatbuffers::InvalidFlatbuffer,
},
#[snafu(display("{}", source))]
InvalidServerId {
source: data_types::server_id::Error,
},
#[snafu(display("{}", source))]
InvalidClockValue { source: ClockValueError },
#[snafu(display("entry bytes not present in sequenced entry"))]
EntryBytesMissing,
}
impl TryFrom<Vec<u8>> for OwnedSequencedEntry {
type Error = SequencedEntryError;
fn try_from(data: Vec<u8>) -> Result<Self, Self::Error> {
OwnedSequencedEntryTryBuilder {
data,
fb_builder: |data| {
let fb = flatbuffers::root::<entry_fb::SequencedEntry<'_>>(data)
.context(InvalidFlatbuffer)?;
// Raise an error now if the server ID is invalid so that `SequencedEntry`'s
// `server_id` method can assume it has a valid `ServerId`
TryInto::<ServerId>::try_into(fb.server_id()).context(InvalidServerId)?;
// Raise an error now if the clock value is invalid so that `SequencedEntry`'s
// `clock_value` method can assume it has a valid `ClockValue`
TryInto::<ClockValue>::try_into(fb.clock_value()).context(InvalidClockValue)?;
Ok(fb)
},
entry_builder: |data| match flatbuffers::root::<entry_fb::SequencedEntry<'_>>(data)
.context(InvalidFlatbuffer)?
.entry_bytes()
{
Some(entry_bytes) => Ok(Some(
flatbuffers::root::<entry_fb::Entry<'_>>(&entry_bytes)
.context(InvalidFlatbuffer)?,
)),
None => Ok(None),
},
}
.try_build()
}
}
#[derive(Debug)]
pub struct BorrowedSequencedEntry<'a> {
fb: entry_fb::SequencedEntry<'a>,
entry: Option<entry_fb::Entry<'a>>,
}
impl SequencedEntry for BorrowedSequencedEntry<'_> {
fn partition_writes(&self) -> Option<Vec<PartitionWrite<'_>>> {
match self.entry.as_ref() {
Some(e) => match e.operation_as_write().as_ref() {
Some(w) => w
.partition_writes()
.as_ref()
.map(|w| w.iter().map(|fb| PartitionWrite { fb }).collect::<Vec<_>>()),
None => None,
},
None => None,
}
}
fn fb(&self) -> &entry_fb::SequencedEntry<'_> {
&self.fb
}
fn size(&self) -> usize {
self.fb._tab.buf.len()
}
}
#[self_referencing]
#[derive(Debug)]
pub struct Segment {
data: Vec<u8>,
#[borrows(data)]
#[covariant]
fb: entry_fb::Segment<'this>,
#[borrows(data)]
#[covariant]
sequenced_entries: Vec<BorrowedSequencedEntry<'this>>,
}
impl Segment {
pub fn new_from_entries(
segment_id: u64,
server_id: ServerId,
clock_value: Option<ClockValue>,
entries: &[Arc<dyn SequencedEntry>],
) -> Self {
let mut fbb = FlatBufferBuilder::new_with_capacity(1024);
let entries = entries
.iter()
.map(|e| {
let entry_bytes = fbb.create_vector_direct(
e.fb()
.entry_bytes()
.expect("entry must be present in sequenced entry when initialized"),
);
entry_fb::SequencedEntry::create(
&mut fbb,
&entry_fb::SequencedEntryArgs {
server_id: e.server_id().get_u32(),
clock_value: e.clock_value().get_u64(),
entry_bytes: Some(entry_bytes),
},
)
})
.collect::<Vec<_>>();
let entries = fbb.create_vector(&entries);
let segment = entry_fb::Segment::create(
&mut fbb,
&entry_fb::SegmentArgs {
id: segment_id,
server_id: server_id.get_u32(),
consistency_high_water_clock: clock_value.map(|c| c.get_u64()).unwrap_or(0),
entries: Some(entries),
},
);
fbb.finish(segment, None);
let (mut data, idx) = fbb.collapse();
Self::try_from(data.split_off(idx))
.expect("Flatbuffer data for sequenced entry just constructed should be valid")
}
/// Returns the Flatbuffers struct for the Segment
pub fn fb(&self) -> &entry_fb::Segment<'_> {
self.borrow_fb()
}
/// Returns the serialized bytes for the Segment
pub fn data(&self) -> &[u8] {
self.borrow_data()
}
pub fn consistency_high_water_clock(&self) -> Option<ClockValue> {
self.fb().consistency_high_water_clock().try_into().ok()
}
pub fn server_id(&self) -> ServerId {
self.fb()
.server_id()
.try_into()
.expect("ServerId should have been validated when this was built from flatbuffers")
}
}
impl TryFrom<Vec<u8>> for Segment {
type Error = SequencedEntryError;
fn try_from(data: Vec<u8>) -> Result<Self, Self::Error> {
SegmentTryBuilder {
data,
fb_builder: |data| {
let fb =
flatbuffers::root::<entry_fb::Segment<'_>>(data).context(InvalidFlatbuffer)?;
// Raise an error now if the server ID is invalid so that `SequencedEntry`'s
// `server_id` method can assume it has a valid `ServerId`
TryInto::<ServerId>::try_into(fb.server_id()).context(InvalidServerId)?;
Ok(fb)
},
sequenced_entries_builder: |data| match flatbuffers::root::<entry_fb::Segment<'_>>(data)
.context(InvalidFlatbuffer)?
.entries()
{
Some(entries) => {
Ok(entries
.iter()
.map(|fb| {
// Raise an error now if the server ID is invalid so that `SequencedEntry`'s
// `server_id` method can assume it has a valid `ServerId`
TryInto::<ServerId>::try_into(fb.server_id())
.context(InvalidServerId)?;
// Raise an error now if the clock value is invalid so that `SequencedEntry`'s
// `clock_value` method can assume it has a valid `ClockValue`
TryInto::<ClockValue>::try_into(fb.clock_value())
.context(InvalidClockValue)?;
Ok(BorrowedSequencedEntry {
fb,
entry: Some(
flatbuffers::root::<entry_fb::Entry<'_>>(
&fb.entry_bytes().context(EntryBytesMissing)?,
)
.context(InvalidFlatbuffer)?,
),
})
})
.collect::<Result<Vec<_>, Self::Error>>()?)
}
None => Ok(vec![]),
},
}
.try_build()
}
}
pub mod test_helpers {
use chrono::TimeZone;
@ -1564,20 +1246,6 @@ pub mod test_helpers {
.collect::<Vec<_>>()
}
/// Converts the line protocol to a `SequencedEntry` with the given server id
/// and clock value
pub fn lp_to_sequenced_entry(
lp: &str,
server_id: u32,
clock_value: u64,
) -> OwnedSequencedEntry {
let entry = lp_to_entry(lp);
let server_id = ServerId::try_from(server_id).unwrap();
let clock_value = ClockValue::try_from(clock_value).unwrap();
OwnedSequencedEntry::new_from_entry_bytes(clock_value, server_id, entry.data()).unwrap()
}
/// Returns a test sharder that will assign shard ids from [0, count)
/// incrementing for each line.
pub fn sharder(count: ShardId) -> Option<TestSharder> {
@ -2257,185 +1925,4 @@ mod tests {
assert!(sharded_entries.is_err());
}
#[test]
fn sequenced_entry() {
let lp = vec![
"a,host=a val=23i 983",
"a,host=a,region=west val2=23.2 2343",
"a val=21i,bool=true,string=\"hello\" 222",
]
.join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries = lines_to_sharded_entries(
&lines,
ARBITRARY_DEFAULT_TIME,
sharder(1).as_ref(),
&partitioner(1),
)
.unwrap();
let entry_bytes = sharded_entries.first().unwrap().entry.data();
let clock_value = ClockValue::try_from(23).unwrap();
let server_id = ServerId::try_from(2).unwrap();
let sequenced_entry =
OwnedSequencedEntry::new_from_entry_bytes(clock_value, server_id, entry_bytes).unwrap();
assert_eq!(sequenced_entry.clock_value(), clock_value);
assert_eq!(sequenced_entry.server_id(), server_id);
let partition_writes = sequenced_entry.partition_writes().unwrap();
let table_batches = partition_writes.first().unwrap().table_batches();
let batch = table_batches.first().unwrap();
let columns = batch.columns();
assert_eq!(batch.row_count(), 3);
assert_eq!(columns.len(), 7);
let col = columns.get(0).unwrap();
assert_eq!(col.name(), "bool");
assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Field);
let values = col.values().bool_values().unwrap();
assert_eq!(&values, &[None, None, Some(true)]);
let col = columns.get(1).unwrap();
assert_eq!(col.name(), "host");
assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Tag);
let values = match col.values() {
TypedValuesIterator::String(v) => v,
_ => panic!("wrong type"),
};
let values = values.collect::<Vec<_>>();
assert_eq!(&values, &[Some("a"), Some("a"), None]);
let col = columns.get(2).unwrap();
assert_eq!(col.name(), "region");
assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Tag);
let values = match col.values() {
TypedValuesIterator::String(v) => v,
_ => panic!("wrong type"),
};
let values = values.collect::<Vec<_>>();
assert_eq!(&values, &[None, Some("west"), None]);
let col = columns.get(3).unwrap();
assert_eq!(col.name(), "string");
assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Field);
let values = match col.values() {
TypedValuesIterator::String(v) => v,
_ => panic!("wrong type"),
};
let values = values.collect::<Vec<_>>();
assert_eq!(&values, &[None, None, Some("hello")]);
let col = columns.get(4).unwrap();
assert_eq!(col.name(), TIME_COLUMN_NAME);
assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Time);
let values = col.values().i64_values().unwrap();
assert_eq!(&values, &[Some(983), Some(2343), Some(222)]);
let col = columns.get(5).unwrap();
assert_eq!(col.name(), "val");
assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Field);
let values = col.values().i64_values().unwrap();
assert_eq!(&values, &[Some(23), None, Some(21)]);
let col = columns.get(6).unwrap();
assert_eq!(col.name(), "val2");
assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Field);
let values = col.values().f64_values().unwrap();
assert_eq!(&values, &[None, Some(23.2), None]);
}
#[test]
fn validate_sequenced_entry_server_id() {
let lp = vec![
"a,host=a val=23i 983",
"a,host=a,region=west val2=23.2 2343",
"a val=21i,bool=true,string=\"hello\" 222",
]
.join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries = lines_to_sharded_entries(
&lines,
ARBITRARY_DEFAULT_TIME,
sharder(1).as_ref(),
&partitioner(1),
)
.unwrap();
let entry_bytes = sharded_entries.first().unwrap().entry.data();
const OVERHEAD: usize = 4 * std::mem::size_of::<u64>();
let mut fbb = FlatBufferBuilder::new_with_capacity(entry_bytes.len() + OVERHEAD);
let entry_bytes = fbb.create_vector_direct(entry_bytes);
let sequenced_entry = entry_fb::SequencedEntry::create(
&mut fbb,
&entry_fb::SequencedEntryArgs {
clock_value: 3,
server_id: 0, // <----- IMPORTANT PART this is invalid and should error
entry_bytes: Some(entry_bytes),
},
);
fbb.finish(sequenced_entry, None);
let (mut data, idx) = fbb.collapse();
let result = OwnedSequencedEntry::try_from(data.split_off(idx));
assert!(
matches!(result, Err(SequencedEntryError::InvalidServerId { .. })),
"result was {:?}",
result
);
}
#[test]
fn validate_sequenced_entry_clock_value() {
let lp = vec![
"a,host=a val=23i 983",
"a,host=a,region=west val2=23.2 2343",
"a val=21i,bool=true,string=\"hello\" 222",
]
.join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries = lines_to_sharded_entries(
&lines,
ARBITRARY_DEFAULT_TIME,
sharder(1).as_ref(),
&partitioner(1),
)
.unwrap();
let entry_bytes = sharded_entries.first().unwrap().entry.data();
const OVERHEAD: usize = 4 * std::mem::size_of::<u64>();
let mut fbb = FlatBufferBuilder::new_with_capacity(entry_bytes.len() + OVERHEAD);
let entry_bytes = fbb.create_vector_direct(entry_bytes);
let sequenced_entry = entry_fb::SequencedEntry::create(
&mut fbb,
&entry_fb::SequencedEntryArgs {
clock_value: 0, // <----- IMPORTANT PART this is invalid and should error
server_id: 5,
entry_bytes: Some(entry_bytes),
},
);
fbb.finish(sequenced_entry, None);
let (mut data, idx) = fbb.collapse();
let result = OwnedSequencedEntry::try_from(data.split_off(idx));
assert!(
matches!(result, Err(SequencedEntryError::InvalidClockValue { .. })),
"result was {:?}",
result
);
}
}

View File

@ -2280,339 +2280,6 @@ pub mod influxdata {
ds.finish()
}
}
pub enum SegmentOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct Segment<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for Segment<'a> {
type Inner = Segment<'a>;
#[inline]
fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self {
_tab: flatbuffers::Table { buf, loc },
}
}
}
impl<'a> Segment<'a> {
#[inline]
pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
Segment { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>,
args: &'args SegmentArgs<'args>,
) -> flatbuffers::WIPOffset<Segment<'bldr>> {
let mut builder = SegmentBuilder::new(_fbb);
builder.add_consistency_high_water_clock(args.consistency_high_water_clock);
builder.add_id(args.id);
if let Some(x) = args.entries {
builder.add_entries(x);
}
builder.add_server_id(args.server_id);
builder.finish()
}
pub const VT_ID: flatbuffers::VOffsetT = 4;
pub const VT_SERVER_ID: flatbuffers::VOffsetT = 6;
pub const VT_CONSISTENCY_HIGH_WATER_CLOCK: flatbuffers::VOffsetT = 8;
pub const VT_ENTRIES: flatbuffers::VOffsetT = 10;
#[inline]
pub fn id(&self) -> u64 {
self._tab.get::<u64>(Segment::VT_ID, Some(0)).unwrap()
}
#[inline]
pub fn server_id(&self) -> u32 {
self._tab
.get::<u32>(Segment::VT_SERVER_ID, Some(0))
.unwrap()
}
#[inline]
pub fn consistency_high_water_clock(&self) -> u64 {
self._tab
.get::<u64>(Segment::VT_CONSISTENCY_HIGH_WATER_CLOCK, Some(0))
.unwrap()
}
#[inline]
pub fn entries(
&self,
) -> Option<
flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset<SequencedEntry<'a>>>,
> {
self._tab.get::<flatbuffers::ForwardsUOffset<
flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset<SequencedEntry>>,
>>(Segment::VT_ENTRIES, None)
}
}
impl flatbuffers::Verifiable for Segment<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier,
pos: usize,
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<u64>(&"id", Self::VT_ID, false)?
.visit_field::<u32>(&"server_id", Self::VT_SERVER_ID, false)?
.visit_field::<u64>(
&"consistency_high_water_clock",
Self::VT_CONSISTENCY_HIGH_WATER_CLOCK,
false,
)?
.visit_field::<flatbuffers::ForwardsUOffset<
flatbuffers::Vector<
'_,
flatbuffers::ForwardsUOffset<SequencedEntry>,
>,
>>(&"entries", Self::VT_ENTRIES, false)?
.finish();
Ok(())
}
}
pub struct SegmentArgs<'a> {
pub id: u64,
pub server_id: u32,
pub consistency_high_water_clock: u64,
pub entries: Option<
flatbuffers::WIPOffset<
flatbuffers::Vector<
'a,
flatbuffers::ForwardsUOffset<SequencedEntry<'a>>,
>,
>,
>,
}
impl<'a> Default for SegmentArgs<'a> {
#[inline]
fn default() -> Self {
SegmentArgs {
id: 0,
server_id: 0,
consistency_high_water_clock: 0,
entries: None,
}
}
}
pub struct SegmentBuilder<'a: 'b, 'b> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b> SegmentBuilder<'a, 'b> {
#[inline]
pub fn add_id(&mut self, id: u64) {
self.fbb_.push_slot::<u64>(Segment::VT_ID, id, 0);
}
#[inline]
pub fn add_server_id(&mut self, server_id: u32) {
self.fbb_
.push_slot::<u32>(Segment::VT_SERVER_ID, server_id, 0);
}
#[inline]
pub fn add_consistency_high_water_clock(
&mut self,
consistency_high_water_clock: u64,
) {
self.fbb_.push_slot::<u64>(
Segment::VT_CONSISTENCY_HIGH_WATER_CLOCK,
consistency_high_water_clock,
0,
);
}
#[inline]
pub fn add_entries(
&mut self,
entries: flatbuffers::WIPOffset<
flatbuffers::Vector<
'b,
flatbuffers::ForwardsUOffset<SequencedEntry<'b>>,
>,
>,
) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(
Segment::VT_ENTRIES,
entries,
);
}
#[inline]
pub fn new(
_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>,
) -> SegmentBuilder<'a, 'b> {
let start = _fbb.start_table();
SegmentBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<Segment<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl std::fmt::Debug for Segment<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut ds = f.debug_struct("Segment");
ds.field("id", &self.id());
ds.field("server_id", &self.server_id());
ds.field(
"consistency_high_water_clock",
&self.consistency_high_water_clock(),
);
ds.field("entries", &self.entries());
ds.finish()
}
}
pub enum SequencedEntryOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct SequencedEntry<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for SequencedEntry<'a> {
type Inner = SequencedEntry<'a>;
#[inline]
fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self {
_tab: flatbuffers::Table { buf, loc },
}
}
}
impl<'a> SequencedEntry<'a> {
#[inline]
pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
SequencedEntry { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>,
args: &'args SequencedEntryArgs<'args>,
) -> flatbuffers::WIPOffset<SequencedEntry<'bldr>> {
let mut builder = SequencedEntryBuilder::new(_fbb);
builder.add_clock_value(args.clock_value);
if let Some(x) = args.entry_bytes {
builder.add_entry_bytes(x);
}
builder.add_server_id(args.server_id);
builder.finish()
}
pub const VT_CLOCK_VALUE: flatbuffers::VOffsetT = 4;
pub const VT_SERVER_ID: flatbuffers::VOffsetT = 6;
pub const VT_ENTRY_BYTES: flatbuffers::VOffsetT = 8;
#[inline]
pub fn clock_value(&self) -> u64 {
self._tab
.get::<u64>(SequencedEntry::VT_CLOCK_VALUE, Some(0))
.unwrap()
}
#[inline]
pub fn server_id(&self) -> u32 {
self._tab
.get::<u32>(SequencedEntry::VT_SERVER_ID, Some(0))
.unwrap()
}
#[inline]
pub fn entry_bytes(&self) -> Option<&'a [u8]> {
self._tab
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(
SequencedEntry::VT_ENTRY_BYTES,
None,
)
.map(|v| v.safe_slice())
}
}
impl flatbuffers::Verifiable for SequencedEntry<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier,
pos: usize,
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<u64>(&"clock_value", Self::VT_CLOCK_VALUE, false)?
.visit_field::<u32>(&"server_id", Self::VT_SERVER_ID, false)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>(&"entry_bytes", Self::VT_ENTRY_BYTES, false)?
.finish();
Ok(())
}
}
pub struct SequencedEntryArgs<'a> {
pub clock_value: u64,
pub server_id: u32,
pub entry_bytes: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for SequencedEntryArgs<'a> {
#[inline]
fn default() -> Self {
SequencedEntryArgs {
clock_value: 0,
server_id: 0,
entry_bytes: None,
}
}
}
pub struct SequencedEntryBuilder<'a: 'b, 'b> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b> SequencedEntryBuilder<'a, 'b> {
#[inline]
pub fn add_clock_value(&mut self, clock_value: u64) {
self.fbb_
.push_slot::<u64>(SequencedEntry::VT_CLOCK_VALUE, clock_value, 0);
}
#[inline]
pub fn add_server_id(&mut self, server_id: u32) {
self.fbb_
.push_slot::<u32>(SequencedEntry::VT_SERVER_ID, server_id, 0);
}
#[inline]
pub fn add_entry_bytes(
&mut self,
entry_bytes: flatbuffers::WIPOffset<flatbuffers::Vector<'b, u8>>,
) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(
SequencedEntry::VT_ENTRY_BYTES,
entry_bytes,
);
}
#[inline]
pub fn new(
_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>,
) -> SequencedEntryBuilder<'a, 'b> {
let start = _fbb.start_table();
SequencedEntryBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<SequencedEntry<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl std::fmt::Debug for SequencedEntry<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut ds = f.debug_struct("SequencedEntry");
ds.field("clock_value", &self.clock_value());
ds.field("server_id", &self.server_id());
ds.field("entry_bytes", &self.entry_bytes());
ds.finish()
}
}
} // pub mod v1
} // pub mod write
} // pub mod iox

View File

@ -32,58 +32,6 @@ message PartitionTemplate {
repeated Part parts = 1;
}
message WriteBufferConfig {
enum Rollover {
ROLLOVER_UNSPECIFIED = 0;
// Drop the old segment even though it hasn't been persisted. This part of
// the Write Buffer will be lost on this server.
ROLLOVER_DROP_OLD_SEGMENT = 1;
// Drop the incoming write and fail silently. This favors making sure that
// older Write Buffer data will be backed up.
ROLLOVER_DROP_INCOMING = 2;
// Reject the incoming write and return an error. The client may retry the
// request, which will succeed once the oldest segment has been
// persisted to object storage.
ROLLOVER_RETURN_ERROR = 3;
}
// The size the Write Buffer should be limited to. Once the buffer gets to
// this size it will drop old segments to remain below this size, but
// still try to hold as much in memory as possible while remaining
// below this threshold
uint64 buffer_size = 1;
// Write Buffer segments become read-only after crossing over this size,
// which means that segments will always be >= this size. When old segments
// are dropped from of memory, at least this much space will be freed from
// the buffer.
uint64 segment_size = 2;
// What should happen if a write comes in that would exceed the Write Buffer
// size and the oldest segment that could be dropped hasn't yet been
// persisted to object storage. If the oldest segment has been
// persisted, then it will be dropped from the buffer so that new writes
// can be accepted. This option is only for defining the behavior of what
// happens if that segment hasn't been persisted. If set to return an
// error, new writes will be rejected until the oldest segment has been
// persisted so that it can be cleared from memory. Alternatively, this
// can be set so that old segments are dropped even if they haven't been
// persisted. This setting is also useful for cases where persistence
// isn't being used and this is only for in-memory buffering.
Rollover buffer_rollover = 3;
// If set to true, buffer segments will be written to object storage.
bool persist_segments = 4;
// If set, segments will be rolled over after this period of time even
// if they haven't hit the size threshold. This allows them to be written
// out to object storage as they must be immutable first.
google.protobuf.Duration close_segment_after = 5;
}
message LifecycleRules {
message SortOrder {
message ColumnSort {
@ -174,9 +122,6 @@ message DatabaseRules {
// Configures how data flows through the system
LifecycleRules lifecycle_rules = 3;
// Write Buffer configuration for this database
WriteBufferConfig write_buffer_config = 6;
oneof routing_rules {
// Shard config
ShardConfig shard_config = 8;

View File

@ -14,14 +14,12 @@ use crate::influxdata::iox::management::v1 as management;
mod lifecycle;
mod partition;
mod shard;
mod write_buffer;
impl From<DatabaseRules> for management::DatabaseRules {
fn from(rules: DatabaseRules) -> Self {
Self {
name: rules.name.into(),
partition_template: Some(rules.partition_template.into()),
write_buffer_config: rules.write_buffer_config.map(Into::into),
lifecycle_rules: Some(rules.lifecycle_rules.into()),
routing_rules: rules.routing_rules.map(Into::into),
worker_cleanup_avg_sleep: Some(rules.worker_cleanup_avg_sleep.into()),
@ -35,8 +33,6 @@ impl TryFrom<management::DatabaseRules> for DatabaseRules {
fn try_from(proto: management::DatabaseRules) -> Result<Self, Self::Error> {
let name = DatabaseName::new(proto.name.clone()).field("name")?;
let write_buffer_config = proto.write_buffer_config.optional("write_buffer_config")?;
let lifecycle_rules = proto
.lifecycle_rules
.optional("lifecycle_rules")?
@ -60,7 +56,6 @@ impl TryFrom<management::DatabaseRules> for DatabaseRules {
Ok(Self {
name,
partition_template,
write_buffer_config,
lifecycle_rules,
routing_rules,
worker_cleanup_avg_sleep,
@ -239,7 +234,6 @@ mod tests {
assert_eq!(back.lifecycle_rules, Some(LifecycleRules::default().into()));
// These should be none as preserved on non-protobuf DatabaseRules
assert!(back.write_buffer_config.is_none());
assert!(back.routing_rules.is_none());
}
}

View File

@ -1,119 +0,0 @@
use std::convert::{TryFrom, TryInto};
use data_types::database_rules::{WriteBufferConfig, WriteBufferRollover};
use crate::google::{FieldViolation, FromField};
use crate::influxdata::iox::management::v1 as management;
impl From<WriteBufferConfig> for management::WriteBufferConfig {
fn from(rollover: WriteBufferConfig) -> Self {
let buffer_rollover: management::write_buffer_config::Rollover =
rollover.buffer_rollover.into();
Self {
buffer_size: rollover.buffer_size as u64,
segment_size: rollover.segment_size as u64,
buffer_rollover: buffer_rollover as _,
persist_segments: rollover.store_segments,
close_segment_after: rollover.close_segment_after.map(Into::into),
}
}
}
impl TryFrom<management::WriteBufferConfig> for WriteBufferConfig {
type Error = FieldViolation;
fn try_from(proto: management::WriteBufferConfig) -> Result<Self, Self::Error> {
let buffer_rollover = proto.buffer_rollover().scope("buffer_rollover")?;
let close_segment_after = proto
.close_segment_after
.map(TryInto::try_into)
.transpose()
.map_err(|_| FieldViolation {
field: "closeSegmentAfter".to_string(),
description: "Duration must be positive".to_string(),
})?;
Ok(Self {
buffer_size: proto.buffer_size as usize,
segment_size: proto.segment_size as usize,
buffer_rollover,
store_segments: proto.persist_segments,
close_segment_after,
})
}
}
impl From<WriteBufferRollover> for management::write_buffer_config::Rollover {
fn from(rollover: WriteBufferRollover) -> Self {
match rollover {
WriteBufferRollover::DropOldSegment => Self::DropOldSegment,
WriteBufferRollover::DropIncoming => Self::DropIncoming,
WriteBufferRollover::ReturnError => Self::ReturnError,
}
}
}
impl TryFrom<management::write_buffer_config::Rollover> for WriteBufferRollover {
type Error = FieldViolation;
fn try_from(proto: management::write_buffer_config::Rollover) -> Result<Self, Self::Error> {
use management::write_buffer_config::Rollover;
Ok(match proto {
Rollover::Unspecified => return Err(FieldViolation::required("")),
Rollover::DropOldSegment => Self::DropOldSegment,
Rollover::DropIncoming => Self::DropIncoming,
Rollover::ReturnError => Self::ReturnError,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_write_buffer_config_default() {
let protobuf: management::WriteBufferConfig = Default::default();
let res: Result<WriteBufferConfig, _> = protobuf.try_into();
let err = res.expect_err("expected failure");
assert_eq!(&err.field, "buffer_rollover");
assert_eq!(&err.description, "Field is required");
}
#[test]
fn test_write_buffer_config_rollover() {
let protobuf = management::WriteBufferConfig {
buffer_rollover: management::write_buffer_config::Rollover::DropIncoming as _,
..Default::default()
};
let config: WriteBufferConfig = protobuf.clone().try_into().unwrap();
let back: management::WriteBufferConfig = config.clone().into();
assert_eq!(config.buffer_rollover, WriteBufferRollover::DropIncoming);
assert_eq!(protobuf, back);
}
#[test]
fn test_write_buffer_config_negative_duration() {
use crate::google::protobuf::Duration;
let protobuf = management::WriteBufferConfig {
buffer_rollover: management::write_buffer_config::Rollover::DropOldSegment as _,
close_segment_after: Some(Duration {
seconds: -1,
nanos: -40,
}),
..Default::default()
};
let res: Result<WriteBufferConfig, _> = protobuf.try_into();
let err = res.expect_err("expected failure");
assert_eq!(&err.field, "closeSegmentAfter");
assert_eq!(&err.description, "Duration must be positive");
}
}

View File

@ -80,7 +80,7 @@ impl Client {
/// An Entry unit of write payload encoded as Flatbuffer structure
/// and passed as a bytes field in the gRPC protobuf API.
///
/// [Entry]: https://github.com/influxdata/influxdb_iox/blob/main/generated_types/protos/influxdata/iox/write/v1/entry.fbs
/// [Entry]: https://github.com/influxdata/influxdb_iox/blob/main/entry/src/entry.fbs
pub async fn write_entry(
&mut self,
db_name: impl Into<String>,

View File

@ -1,747 +0,0 @@
//! This module contains code for managing the Write Buffer
use data_types::{database_rules::WriteBufferRollover, server_id::ServerId, DatabaseName};
use entry::{ClockValue, Segment as EntrySegment, SequencedEntry};
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
use std::{convert::TryInto, mem, sync::Arc};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use crc32fast::Hasher;
use data_types::database_rules::WriteBufferConfig;
use data_types::write_buffer::SegmentPersistence;
use observability_deps::tracing::{error, info, warn};
use parking_lot::Mutex;
use snafu::{ensure, ResultExt, Snafu};
use tracker::{TaskRegistration, TrackedFutureExt};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Max size limit hit {}", size))]
MaxSizeLimit { size: u64 },
#[snafu(display(
"Unable to drop segment to reduce size below max. Current size {}, segment count {}",
size,
segment_count
))]
UnableToDropSegment { size: usize, segment_count: usize },
#[snafu(display(
"Sequence from server {} out of order. Current: {}, incomming {}",
server,
current_sequence,
incoming_sequence,
))]
SequenceOutOfOrder {
server: ServerId,
current_sequence: u64,
incoming_sequence: u64,
},
#[snafu(display("segment id must be between [1, 1,000,000,000)"))]
SegmentIdOutOfBounds,
#[snafu(display("unable to compress segment id {}: {}", segment_id, source))]
UnableToCompressData {
segment_id: u64,
source: snap::Error,
},
#[snafu(display("unable to decompress segment data: {}", source))]
UnableToDecompressData { source: snap::Error },
#[snafu(display("unable to read checksum: {}", source))]
UnableToReadChecksum {
source: std::array::TryFromSliceError,
},
#[snafu(display("checksum mismatch for segment"))]
ChecksumMismatch,
#[snafu(display("the flatbuffers Segment is invalid: {}", source))]
InvalidFlatbuffersSegment {
source: flatbuffers::InvalidFlatbuffer,
},
#[snafu(display("the flatbuffers size is too small; only found {} bytes", bytes))]
FlatbuffersSegmentTooSmall { bytes: usize },
#[snafu(display("flatbuffers for segment invalid: {}", source))]
FlatbuffersInvalid { source: entry::SequencedEntryError },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// An in-memory buffer for the Write Buffer. It is split up into segments,
/// which can be persisted to object storage.
#[derive(Debug)]
pub struct Buffer {
max_size: usize,
current_size: usize,
segment_size: usize,
pub persist: bool,
open_segment: Segment,
closed_segments: Vec<Arc<Segment>>,
rollover_behavior: WriteBufferRollover,
highwater_clock: Option<ClockValue>,
server_id: ServerId,
}
impl Buffer {
pub fn new(
max_size: usize,
segment_size: usize,
rollover_behavior: WriteBufferRollover,
persist: bool,
server_id: ServerId,
) -> Self {
Self {
max_size,
segment_size,
persist,
rollover_behavior,
open_segment: Segment::new(1, server_id),
current_size: 0,
closed_segments: vec![],
highwater_clock: None,
server_id,
}
}
pub fn new_from_config(config: &WriteBufferConfig, server_id: ServerId) -> Self {
Self::new(
config.buffer_size,
config.segment_size,
config.buffer_rollover,
config.store_segments,
server_id,
)
}
/// Appends a `SequencedEntry` onto the buffer, returning the segment if it
/// has been closed out. If the max size of the buffer would be exceeded
/// by accepting the write, the oldest (first) of the closed segments
/// will be dropped, if it is persisted. Otherwise, an error is returned.
pub fn append(&mut self, write: Arc<dyn SequencedEntry>) -> Result<Option<Arc<Segment>>> {
let write_size = write.size();
while self.current_size + write_size > self.max_size {
let oldest_is_persisted = match self.closed_segments.get(0) {
Some(s) => s.persisted().is_some(),
None => false,
};
if oldest_is_persisted {
self.remove_oldest_segment();
continue;
}
match self.rollover_behavior {
WriteBufferRollover::DropIncoming => {
warn!(
"Write Buffer is full, dropping incoming write \
for current segment (segment id: {:?})",
self.open_segment.id,
);
return Ok(None);
}
WriteBufferRollover::DropOldSegment => {
let oldest_segment_id = self.remove_oldest_segment();
warn!(
"Write Buffer is full, dropping oldest segment (segment id: {:?})",
oldest_segment_id
);
}
WriteBufferRollover::ReturnError => {
return UnableToDropSegment {
size: self.current_size,
segment_count: self.closed_segments.len(),
}
.fail();
}
}
}
let mut closed_segment = None;
self.current_size += write_size;
self.open_segment.append(write);
if self.open_segment.size > self.segment_size {
let next_id = self.open_segment.id + 1;
let segment = mem::replace(
&mut self.open_segment,
Segment::new(next_id, self.server_id),
);
let segment = Arc::new(segment);
self.closed_segments.push(Arc::clone(&segment));
closed_segment = Some(segment);
}
Ok(closed_segment)
}
/// Returns the current size of the buffer.
pub fn size(&self) -> usize {
self.current_size
}
/// Returns any writes after the passed in `ClockValue`
pub fn writes_since(&self, since: ClockValue) -> Vec<Arc<dyn SequencedEntry>> {
let mut writes = Vec::new();
// start with the newest writes and go back. Hopefully they're asking for
// something recent.
for w in self.open_segment.writes.iter().rev() {
if w.clock_value() <= since {
writes.reverse();
return writes;
}
writes.push(Arc::clone(w));
}
for s in self.closed_segments.iter().rev() {
for w in s.writes.iter().rev() {
if w.clock_value() == since {
writes.reverse();
return writes;
}
writes.push(Arc::clone(w));
}
}
writes.reverse();
writes
}
/// Removes the oldest segment present in the buffer, returning its id
fn remove_oldest_segment(&mut self) -> u64 {
let removed_segment = self.closed_segments.remove(0);
self.current_size -= removed_segment.size;
removed_segment.id
}
}
/// Segment is a collection of sequenced entries that can be persisted to
/// object store.
#[derive(Debug)]
pub struct Segment {
pub(crate) id: u64,
size: usize,
pub writes: Vec<Arc<dyn SequencedEntry>>,
// Time this segment was initialized
created_at: DateTime<Utc>,
// Persistence metadata if segment is persisted
persisted: Mutex<Option<SegmentPersistence>>,
server_id: ServerId,
highwater_clock: Option<ClockValue>,
}
impl Segment {
fn new(id: u64, server_id: ServerId) -> Self {
Self {
id,
size: 0,
writes: vec![],
created_at: Utc::now(),
persisted: Mutex::new(None),
server_id,
highwater_clock: None,
}
}
// appends the write to the segment
fn append(&mut self, write: Arc<dyn SequencedEntry>) {
self.size += write.size();
self.writes.push(write);
}
/// sets the persistence metadata for this segment
pub fn set_persisted(&self, persisted: SegmentPersistence) {
let mut self_persisted = self.persisted.lock();
*self_persisted = Some(persisted);
}
/// returns persistence metadata for this segment if persisted
pub fn persisted(&self) -> Option<SegmentPersistence> {
self.persisted.lock().clone()
}
/// Spawns a tokio task that will continuously try to persist the bytes to
/// the given object store location.
pub fn persist_bytes_in_background(
&self,
tracker: TaskRegistration,
db_name: &DatabaseName<'_>,
store: Arc<ObjectStore>,
) -> Result<()> {
let data = self.to_file_bytes()?;
let location = database_object_store_path(self.server_id, db_name, &store);
let location = object_store_path_for_segment(&location, self.id)?;
let len = data.len();
let mut stream_data = std::io::Result::Ok(data.clone());
tokio::task::spawn(
async move {
while let Err(err) = store
.put(
&location,
futures::stream::once(async move { stream_data }),
Some(len),
)
.await
{
error!("error writing bytes to store: {}", err);
tokio::time::sleep(tokio::time::Duration::from_secs(
super::STORE_ERROR_PAUSE_SECONDS,
))
.await;
stream_data = std::io::Result::Ok(data.clone());
}
// TODO: Mark segment as persisted
info!("persisted data to {}", location.display());
}
.track(tracker),
);
Ok(())
}
/// serialize the segment to the bytes to represent it in a file. This
/// compresses the flatbuffers payload and writes a crc32 checksum at
/// the end. The Segment will get the server ID this Buffer is on and
/// the highwater clock value for the last consistency check of this Buffer.
pub fn to_file_bytes(&self) -> Result<Bytes> {
let segment = EntrySegment::new_from_entries(
self.id,
self.server_id,
self.highwater_clock,
&self.writes,
);
let mut encoder = snap::raw::Encoder::new();
let mut compressed_data =
encoder
.compress_vec(segment.data())
.context(UnableToCompressData {
segment_id: self.id,
})?;
let mut hasher = Hasher::new();
hasher.update(&compressed_data);
let checksum = hasher.finalize();
compressed_data.extend_from_slice(&checksum.to_le_bytes());
Ok(Bytes::from(compressed_data))
}
/// checks the crc32 for the compressed data, decompresses it and
/// deserializes it into a Segment from internal_types::entry.
pub fn entry_segment_from_file_bytes(data: &[u8]) -> Result<EntrySegment> {
if data.len() < std::mem::size_of::<u32>() {
return FlatbuffersSegmentTooSmall { bytes: data.len() }.fail();
}
let (data, checksum) = data.split_at(data.len() - std::mem::size_of::<u32>());
let checksum = u32::from_le_bytes(checksum.try_into().context(UnableToReadChecksum)?);
let mut hasher = Hasher::new();
hasher.update(&data);
if checksum != hasher.finalize() {
return Err(Error::ChecksumMismatch);
}
let mut decoder = snap::raw::Decoder::new();
let data = decoder
.decompress_vec(data)
.context(UnableToDecompressData)?;
data.try_into().context(FlatbuffersInvalid)
}
}
const WRITE_BUFFER_DIR: &str = "wb";
const MAX_SEGMENT_ID: u64 = 999_999_999;
const SEGMENT_FILE_EXTENSION: &str = ".segment";
/// Builds the path for a given segment id, given the root object store path.
/// The path should be where the root of the database is (e.g. 1/my_db/).
fn object_store_path_for_segment<P: ObjectStorePath>(root_path: &P, segment_id: u64) -> Result<P> {
ensure!(
segment_id < MAX_SEGMENT_ID && segment_id > 0,
SegmentIdOutOfBounds
);
let millions_place = segment_id / 1_000_000;
let millions = millions_place * 1_000_000;
let thousands_place = (segment_id - millions) / 1_000;
let thousands = thousands_place * 1_000;
let hundreds_place = segment_id - millions - thousands;
let mut path = root_path.clone();
path.push_all_dirs(&[
WRITE_BUFFER_DIR,
&format!("{:03}", millions_place),
&format!("{:03}", thousands_place),
]);
path.set_file_name(format!("{:03}{}", hundreds_place, SEGMENT_FILE_EXTENSION));
Ok(path)
}
// base location in object store for a given database name
fn database_object_store_path(
server_id: ServerId,
database_name: &DatabaseName<'_>,
store: &ObjectStore,
) -> object_store::path::Path {
let mut path = store.new_path();
path.push_dir(format!("{}", server_id));
path.push_dir(database_name.to_string());
path
}
#[cfg(test)]
mod tests {
use super::*;
use entry::{test_helpers::lp_to_sequenced_entry as lp_2_se, SequencedEntry};
use object_store::memory::InMemory;
use std::{convert::TryFrom, ops::Deref};
#[test]
fn append_increments_current_size_and_uses_existing_segment() {
let max = 1 << 32;
let segment = 1 << 16;
let server_id = ServerId::try_from(1).unwrap();
let mut buf = Buffer::new(
max,
segment,
WriteBufferRollover::ReturnError,
false,
server_id,
);
let write = lp_to_sequenced_entry(server_id, 1, "cpu val=1 10");
let size = write.size();
assert_eq!(0, buf.size());
let segment = buf.append(write).unwrap();
assert_eq!(size, buf.size());
assert!(segment.is_none());
let write = lp_to_sequenced_entry(server_id, 2, "cpu val=1 10");
let segment = buf.append(write).unwrap();
assert_eq!(size * 2, buf.size());
assert!(segment.is_none());
}
#[test]
fn append_rolls_over_segment() {
let max = 1 << 16;
let segment = 1;
let server_id = ServerId::try_from(1).unwrap();
let mut buf = Buffer::new(
max,
segment,
WriteBufferRollover::ReturnError,
false,
server_id,
);
let write = lp_to_sequenced_entry(server_id, 1, "cpu val=1 10");
let segment = buf.append(write).unwrap();
let segment = segment.unwrap();
assert_eq!(segment.id, 1);
let write = lp_to_sequenced_entry(server_id, 2, "cpu val=1 10");
let segment = buf.append(write).unwrap();
let segment = segment.unwrap();
assert_eq!(segment.id, 2);
}
#[test]
fn drops_persisted_segment_when_over_size() {
let max = 600;
let segment = 1;
let server_id = ServerId::try_from(1).unwrap();
let mut buf = Buffer::new(
max,
segment,
WriteBufferRollover::ReturnError,
false,
server_id,
);
let write = lp_to_sequenced_entry(server_id, 1, "cpu val=1 10");
let segment = buf.append(write).unwrap().unwrap();
assert_eq!(1, segment.id);
assert!(segment.persisted().is_none());
segment.set_persisted(SegmentPersistence {
location: "PLACEHOLDER".to_string(),
time: Utc::now(),
});
let write = lp_to_sequenced_entry(server_id, 2, "cpu val=1 10");
let segment = buf.append(write).unwrap();
let segment = segment.unwrap();
assert_eq!(2, segment.id);
assert_eq!(2, buf.closed_segments.len());
assert_eq!(1, buf.closed_segments[0].id);
assert_eq!(2, buf.closed_segments[1].id);
let write = lp_to_sequenced_entry(server_id, 3, "cpu val=1 10");
let segment = buf.append(write).unwrap();
let segment = segment.unwrap();
assert_eq!(3, segment.id);
assert!(segment.persisted().is_none());
assert_eq!(2, buf.closed_segments.len());
assert_eq!(2, buf.closed_segments[0].id);
assert_eq!(3, buf.closed_segments[1].id);
}
#[test]
fn drops_old_segment_even_if_not_persisted() {
let max = 600;
let segment = 1;
let server_id = ServerId::try_from(1).unwrap();
let mut buf = Buffer::new(
max,
segment,
WriteBufferRollover::DropOldSegment,
false,
server_id,
);
let write = lp_to_sequenced_entry(server_id, 1, "cpu val=1 10");
let segment = buf.append(write).unwrap().unwrap();
assert_eq!(1, segment.id);
let write = lp_to_sequenced_entry(server_id, 2, "cpu val=1 10");
let segment = buf.append(write).unwrap();
let segment = segment.unwrap();
assert_eq!(2, segment.id);
assert_eq!(2, buf.closed_segments.len());
assert_eq!(1, buf.closed_segments[0].id);
assert_eq!(2, buf.closed_segments[1].id);
let write = lp_to_sequenced_entry(server_id, 3, "cpu val=1 10");
let segment = buf.append(write).unwrap();
let segment = segment.unwrap();
assert_eq!(3, segment.id);
assert_eq!(2, buf.closed_segments.len());
assert_eq!(2, buf.closed_segments[0].id);
assert_eq!(3, buf.closed_segments[1].id);
}
#[test]
fn drops_incoming_write_if_oldest_segment_not_persisted() {
let max = 600;
let segment = 1;
let server_id = ServerId::try_from(1).unwrap();
let mut buf = Buffer::new(
max,
segment,
WriteBufferRollover::DropIncoming,
false,
server_id,
);
let write = lp_to_sequenced_entry(server_id, 1, "cpu val=1 10");
let segment = buf.append(write).unwrap().unwrap();
assert_eq!(1, segment.id);
let write = lp_to_sequenced_entry(server_id, 2, "cpu val=1 10");
let segment = buf.append(write).unwrap();
let segment = segment.unwrap();
assert_eq!(2, segment.id);
assert_eq!(2, buf.closed_segments.len());
assert_eq!(1, buf.closed_segments[0].id);
assert_eq!(2, buf.closed_segments[1].id);
let write = lp_to_sequenced_entry(server_id, 3, "cpu val=1 10");
let segment = buf.append(write).unwrap();
assert!(segment.is_none());
assert_eq!(2, buf.closed_segments.len());
assert_eq!(1, buf.closed_segments[0].id);
assert_eq!(2, buf.closed_segments[1].id);
}
#[test]
fn returns_error_if_oldest_segment_not_persisted() {
let max = 600;
let segment = 1;
let server_id = ServerId::try_from(1).unwrap();
let mut buf = Buffer::new(
max,
segment,
WriteBufferRollover::ReturnError,
false,
server_id,
);
let write = lp_to_sequenced_entry(server_id, 1, "cpu val=1 10");
let segment = buf.append(write).unwrap().unwrap();
assert_eq!(1, segment.id);
let write = lp_to_sequenced_entry(server_id, 2, "cpu val=1 10");
let segment = buf.append(write).unwrap();
let segment = segment.unwrap();
assert_eq!(2, segment.id);
assert_eq!(2, buf.closed_segments.len());
assert_eq!(1, buf.closed_segments[0].id);
assert_eq!(2, buf.closed_segments[1].id);
let write = lp_to_sequenced_entry(server_id, 3, "cpu val=1 10");
assert!(buf.append(write).is_err());
assert_eq!(2, buf.closed_segments.len());
assert_eq!(1, buf.closed_segments[0].id);
assert_eq!(2, buf.closed_segments[1].id);
}
fn equal_to_server_id_and_clock_value(
sequenced_entry: &dyn SequencedEntry,
expected_server_id: ServerId,
expected_clock_value: u64,
) {
assert_eq!(sequenced_entry.server_id(), expected_server_id);
assert_eq!(
sequenced_entry.clock_value(),
ClockValue::try_from(expected_clock_value).unwrap()
);
}
#[test]
fn writes_since() {
let max = 1 << 63;
let server_id1 = ServerId::try_from(1).unwrap();
let server_id2 = ServerId::try_from(2).unwrap();
let write = lp_to_sequenced_entry(server_id1, 1, "cpu val=1 10");
let segment = write.size() + 1;
let mut buf = Buffer::new(
max,
segment,
WriteBufferRollover::ReturnError,
false,
server_id1,
);
let segment = buf.append(write).unwrap();
assert!(segment.is_none());
let write = lp_to_sequenced_entry(server_id2, 1, "cpu val=1 10");
let segment = buf.append(write).unwrap();
let segment = segment.unwrap();
assert_eq!(1, segment.id);
let write = lp_to_sequenced_entry(server_id1, 2, "cpu val=1 10");
let segment = buf.append(write).unwrap();
assert!(segment.is_none());
let write = lp_to_sequenced_entry(server_id1, 3, "cpu val=1 10");
let segment = buf.append(write).unwrap();
let segment = segment.unwrap();
assert_eq!(2, segment.id);
let write = lp_to_sequenced_entry(server_id2, 2, "cpu val=1 10");
let segment = buf.append(write).unwrap();
assert!(segment.is_none());
let writes = buf.writes_since(ClockValue::try_from(1).unwrap());
assert_eq!(3, writes.len());
equal_to_server_id_and_clock_value(writes[0].deref(), server_id1, 2);
equal_to_server_id_and_clock_value(writes[1].deref(), server_id1, 3);
equal_to_server_id_and_clock_value(writes[2].deref(), server_id2, 2);
}
#[test]
fn valid_object_store_path_for_segment() {
let storage = ObjectStore::new_in_memory(InMemory::new());
let mut base_path = storage.new_path();
base_path.push_all_dirs(&["1", "mydb"]);
let segment_path = object_store_path_for_segment(&base_path, 23).unwrap();
let mut expected_segment_path = base_path.clone();
expected_segment_path.push_all_dirs(&["wb", "000", "000"]);
expected_segment_path.set_file_name("023.segment");
assert_eq!(segment_path, expected_segment_path);
let segment_path = object_store_path_for_segment(&base_path, 20_003).unwrap();
let mut expected_segment_path = base_path.clone();
expected_segment_path.push_all_dirs(&["wb", "000", "020"]);
expected_segment_path.set_file_name("003.segment");
assert_eq!(segment_path, expected_segment_path);
let segment_path = object_store_path_for_segment(&base_path, 45_010_105).unwrap();
let mut expected_segment_path = base_path;
expected_segment_path.push_all_dirs(&["wb", "045", "010"]);
expected_segment_path.set_file_name("105.segment");
assert_eq!(segment_path, expected_segment_path);
}
#[test]
fn object_store_path_for_segment_out_of_bounds() {
let storage = ObjectStore::new_in_memory(InMemory::new());
let mut base_path = storage.new_path();
base_path.push_all_dirs(&["1", "mydb"]);
let segment_path = object_store_path_for_segment(&base_path, 0).err().unwrap();
matches!(segment_path, Error::SegmentIdOutOfBounds);
let segment_path = object_store_path_for_segment(&base_path, 23_000_000_000)
.err()
.unwrap();
matches!(segment_path, Error::SegmentIdOutOfBounds);
}
#[test]
fn segment_serialize_deserialize() {
let server_id = ServerId::try_from(1).unwrap();
let id = 1;
let mut segment = Segment::new(id, server_id);
let entry1 = lp_to_sequenced_entry(server_id, 1, "foo val=1 123");
segment.append(Arc::clone(&entry1));
let entry2 = lp_to_sequenced_entry(server_id, 2, "foo val=2 124");
segment.append(Arc::clone(&entry2));
let data = segment.to_file_bytes().unwrap();
let recovered_segment = Segment::entry_segment_from_file_bytes(&data).unwrap();
let recovered_entries: Vec<_> = recovered_segment.fb().entries().unwrap().iter().collect();
assert_eq!(recovered_entries.len(), 2);
assert_eq!(recovered_entries[0].server_id(), 1);
assert_eq!(recovered_entries[0].clock_value(), 1);
assert_eq!(
recovered_entries[0].entry_bytes().unwrap(),
entry1.fb().entry_bytes().unwrap()
);
assert_eq!(recovered_entries[1].server_id(), 1);
assert_eq!(recovered_entries[1].clock_value(), 2);
assert_eq!(
recovered_entries[1].entry_bytes().unwrap(),
entry2.fb().entry_bytes().unwrap()
);
}
fn lp_to_sequenced_entry(
server_id: ServerId,
clock_value: u64,
line_protocol: &str,
) -> Arc<dyn SequencedEntry> {
Arc::new(lp_2_se(line_protocol, server_id.get_u32(), clock_value))
}
}

View File

@ -150,16 +150,11 @@ impl Config {
return;
}
let write_buffer = rules
.write_buffer_config
.as_ref()
.map(|config| crate::buffer::Buffer::new_from_config(config, server_id));
let db = Arc::new(Db::new(
rules,
server_id,
object_store,
exec,
write_buffer,
Arc::clone(&self.jobs),
preserved_catalog,
));

View File

@ -3,11 +3,7 @@
use self::access::QueryCatalogAccess;
use self::catalog::TableNameFilter;
use super::{
buffer::{self, Buffer},
JobRegistry,
};
use super::JobRegistry;
use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use async_trait::async_trait;
use catalog::{chunk::Chunk as CatalogChunk, Catalog};
@ -23,7 +19,7 @@ use datafusion::{
catalog::{catalog::CatalogProvider, schema::SchemaProvider},
physical_plan::SendableRecordBatchStream,
};
use entry::{Entry, OwnedSequencedEntry, SequencedEntry};
use entry::Entry;
use internal_types::{arrow::sort::sort_record_batch, selection::Selection};
use lifecycle::LifecycleManager;
use metrics::{KeyValue, MetricRegistry};
@ -32,7 +28,7 @@ use mutable_buffer::chunk::{
};
use object_store::{path::parsed::DirsAndFileName, ObjectStore};
use observability_deps::tracing::{debug, error, info};
use parking_lot::{Mutex, RwLock};
use parking_lot::RwLock;
use parquet_file::{
catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog},
chunk::{Chunk as ParquetChunk, ChunkMetrics as ParquetChunkMetrics},
@ -198,9 +194,6 @@ pub enum Error {
#[snafu(display("Error building sequenced entry: {}", source))]
SequencedEntryError { source: entry::Error },
#[snafu(display("Error sending Sequenced Entry to Write Buffer: {}", source))]
WriteBufferError { source: buffer::Error },
#[snafu(display("Error while handling transaction on preserved catalog: {}", source))]
TransactionError {
source: parquet_file::catalog::Error,
@ -286,11 +279,6 @@ pub struct Db {
/// - The Parquet Buffer where chunks are backed by Parquet file data.
preserved_catalog: PreservedCatalog<Catalog>,
/// The Write Buffer holds sequenced entries in an append in-memory
/// buffer. This buffer is used for sending data to subscribers
/// and to persist segments in object storage for recovery.
pub write_buffer: Option<Mutex<Buffer>>,
/// A handle to the global jobs registry for long running tasks
jobs: Arc<JobRegistry>,
@ -403,7 +391,6 @@ impl Db {
server_id: ServerId,
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
write_buffer: Option<Buffer>,
jobs: Arc<JobRegistry>,
preserved_catalog: PreservedCatalog<Catalog>,
) -> Self {
@ -412,7 +399,6 @@ impl Db {
let rules = RwLock::new(rules);
let server_id = server_id;
let store = Arc::clone(&object_store);
let write_buffer = write_buffer.map(Mutex::new);
let metrics_registry = Arc::clone(&preserved_catalog.state().metrics_registry);
let metric_labels = preserved_catalog.state().metric_labels.clone();
@ -433,7 +419,6 @@ impl Db {
store,
exec,
preserved_catalog,
write_buffer,
jobs,
metrics_registry,
catalog_access,
@ -986,43 +971,8 @@ impl Db {
info!("finished background worker");
}
/// Stores an entry based on the configuration. The Entry will first be
/// converted into a `SequencedEntry` with the logical clock assigned
/// from the database, and then the `SequencedEntry` will be passed to
/// `store_sequenced_entry`.
/// Stores an entry based on the configuration.
pub fn store_entry(&self, entry: Entry) -> Result<()> {
let sequenced_entry = Arc::new(
OwnedSequencedEntry::new_from_entry_bytes(
self.process_clock.next(),
self.server_id,
entry.data(),
)
.context(SequencedEntryError)?,
);
self.store_sequenced_entry(sequenced_entry)
}
/// Given a `SequencedEntry`:
///
/// - If the write buffer is configured, write the `SequencedEntry` into the buffer, which
/// will replicate the `SequencedEntry` based on the configured rules.
/// - If the mutable buffer is configured, the `SequencedEntry` is then written into the
/// mutable buffer.
///
/// Note that if the write buffer is configured but there is an error storing the
/// `SequencedEntry` in the write buffer, the `SequencedEntry` will *not* reach the mutable
/// buffer.
pub fn store_sequenced_entry(&self, sequenced_entry: Arc<dyn SequencedEntry>) -> Result<()> {
// Send to the write buffer, if configured
if let Some(wb) = &self.write_buffer {
wb.lock()
.append(Arc::clone(&sequenced_entry))
.context(WriteBufferError)?;
}
// Send to the mutable buffer
let rules = self.rules.read();
let mutable_size_threshold = rules.lifecycle_rules.mutable_size_threshold;
if rules.lifecycle_rules.immutable {
@ -1037,7 +987,9 @@ impl Db {
// TODO: Direct writes to closing chunks
if let Some(partitioned_writes) = sequenced_entry.partition_writes() {
let process_clock = self.process_clock.next();
if let Some(partitioned_writes) = entry.partition_writes() {
for write in partitioned_writes {
let partition_key = write.key();
let partition = self
@ -1058,11 +1010,7 @@ impl Db {
chunk.mutable_buffer().expect("cannot mutate open chunk");
mb_chunk
.write_table_batch(
sequenced_entry.clock_value(),
sequenced_entry.server_id(),
table_batch,
)
.write_table_batch(process_clock, self.server_id, table_batch)
.context(WriteEntry {
partition_key,
chunk_id,
@ -1088,11 +1036,7 @@ impl Db {
);
mb_chunk
.write_table_batch(
sequenced_entry.clock_value(),
sequenced_entry.server_id(),
table_batch,
)
.write_table_batch(process_clock, self.server_id, table_batch)
.context(WriteEntryInitial { partition_key })?;
let new_chunk = partition
@ -2687,15 +2631,6 @@ mod tests {
));
}
#[tokio::test]
async fn write_goes_to_write_buffer_if_configured() {
let db = Arc::new(TestDb::builder().write_buffer(true).build().await.db);
assert_eq!(db.write_buffer.as_ref().unwrap().lock().size(), 0);
write_lp(db.as_ref(), "cpu bar=1 10");
assert_ne!(db.write_buffer.as_ref().unwrap().lock().size(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn lock_tracker_metrics() {
let object_store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));

View File

@ -72,7 +72,6 @@ fn system_clock_now() -> u64 {
mod tests {
use super::*;
use crate::utils::TestDb;
use entry::test_helpers::lp_to_entry;
use std::{sync::Arc, thread, time::Duration};
#[tokio::test]
@ -98,59 +97,10 @@ mod tests {
);
}
#[tokio::test]
async fn process_clock_incremented_and_set_on_sequenced_entry() {
let before = system_clock_now();
let before = ClockValue::try_from(before).unwrap();
let db = Arc::new(TestDb::builder().write_buffer(true).build().await.db);
let entry = lp_to_entry("cpu bar=1 10");
db.store_entry(entry).unwrap();
let between = system_clock_now();
let between = ClockValue::try_from(between).unwrap();
let entry = lp_to_entry("cpu foo=2 10");
db.store_entry(entry).unwrap();
let after = system_clock_now();
let after = ClockValue::try_from(after).unwrap();
let sequenced_entries = db
.write_buffer
.as_ref()
.unwrap()
.lock()
.writes_since(before);
assert_eq!(sequenced_entries.len(), 2);
assert!(
sequenced_entries[0].clock_value() < between,
"expected {:?} to be before {:?}",
sequenced_entries[0].clock_value(),
between
);
assert!(
between < sequenced_entries[1].clock_value(),
"expected {:?} to be before {:?}",
between,
sequenced_entries[1].clock_value(),
);
assert!(
sequenced_entries[1].clock_value() < after,
"expected {:?} to be before {:?}",
sequenced_entries[1].clock_value(),
after
);
}
#[tokio::test]
async fn next_process_clock_always_increments() {
// Process clock defaults to the current time
let db = Arc::new(TestDb::builder().write_buffer(true).build().await.db);
let db = Arc::new(TestDb::builder().build().await.db);
// Set the process clock value to a time in the future, so that when compared to the
// current time, the process clock value will be greater

View File

@ -86,7 +86,7 @@ use data_types::{
server_id::ServerId,
{DatabaseName, DatabaseNameError},
};
use entry::{lines_to_sharded_entries, Entry, OwnedSequencedEntry, ShardedEntry};
use entry::{lines_to_sharded_entries, Entry, ShardedEntry};
use influxdb_line_protocol::ParsedLine;
use internal_types::once::OnceNonZeroU32;
use metrics::{KeyValue, MetricObserverBuilder, MetricRegistry};
@ -106,7 +106,6 @@ use influxdb_iox_client::{connection::Builder, write};
use rand::seq::SliceRandom;
use std::collections::HashMap;
pub mod buffer;
mod config;
pub mod db;
@ -781,19 +780,6 @@ impl<M: ConnectionManager> Server<M> {
Ok(())
}
pub async fn handle_sequenced_entry(
&self,
db: &Db,
sequenced_entry: OwnedSequencedEntry,
) -> Result<()> {
db.store_sequenced_entry(Arc::new(sequenced_entry))
.map_err(|e| Error::UnknownDatabaseError {
source: Box::new(e),
})?;
Ok(())
}
pub fn db(&self, name: &DatabaseName<'_>) -> Option<Arc<Db>> {
self.config.db(name)
}
@ -992,15 +978,6 @@ pub trait RemoteServer {
/// Sends an Entry to the remote server. An IOx server acting as a
/// router/sharder will call this method to send entries to remotes.
async fn write_entry(&self, db: &str, entry: Entry) -> Result<(), ConnectionManagerError>;
/// Sends a SequencedEntry to the remote server. An IOx server acting as a
/// write buffer will call this method to replicate to other write
/// buffer servers or to send data to downstream subscribers.
async fn write_sequenced_entry(
&self,
db: &str,
sequenced_entry: OwnedSequencedEntry,
) -> Result<(), ConnectionManagerError>;
}
/// The connection manager maps a host identifier to a remote server.
@ -1055,17 +1032,6 @@ impl RemoteServer for RemoteServerImpl {
.await
.context(RemoteServerWriteError)
}
/// Sends a SequencedEntry to the remote server. An IOx server acting as a
/// write buffer will call this method to replicate to other write
/// buffer servers or to send data to downstream subscribers.
async fn write_sequenced_entry(
&self,
_db: &str,
_sequenced_entry: OwnedSequencedEntry,
) -> Result<(), ConnectionManagerError> {
unimplemented!()
}
}
// get bytes from the location in object store
@ -1197,7 +1163,6 @@ mod tests {
partition_template: PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("YYYY-MM".to_string())],
},
write_buffer_config: None,
lifecycle_rules: Default::default(),
routing_rules: None,
worker_cleanup_avg_sleep: Duration::from_secs(2),
@ -1294,7 +1259,6 @@ mod tests {
partition_template: PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("YYYY-MM".to_string())],
},
write_buffer_config: None,
lifecycle_rules: Default::default(),
routing_rules: None,
worker_cleanup_avg_sleep: Duration::from_secs(2),
@ -1727,14 +1691,6 @@ mod tests {
self.written.store(true, Ordering::Relaxed);
Ok(())
}
async fn write_sequenced_entry(
&self,
_db: &str,
_sequenced_entry: OwnedSequencedEntry,
) -> Result<(), ConnectionManagerError> {
unimplemented!()
}
}
fn parsed_lines(lp: &str) -> Vec<ParsedLine<'_>> {

View File

@ -1,6 +1,6 @@
use data_types::{
chunk_metadata::{ChunkStorage, ChunkSummary},
database_rules::{DatabaseRules, WriteBufferRollover},
database_rules::DatabaseRules,
server_id::ServerId,
DatabaseName,
};
@ -8,7 +8,6 @@ use object_store::{memory::InMemory, ObjectStore};
use query::{exec::Executor, Database};
use crate::{
buffer::Buffer,
db::{load_or_create_preserved_catalog, Db},
JobRegistry,
};
@ -33,7 +32,6 @@ pub struct TestDbBuilder {
server_id: Option<ServerId>,
object_store: Option<Arc<ObjectStore>>,
db_name: Option<DatabaseName<'static>>,
write_buffer: bool,
worker_cleanup_avg_sleep: Option<Duration>,
}
@ -56,19 +54,6 @@ impl TestDbBuilder {
let exec = Arc::new(Executor::new(1));
let metrics_registry = Arc::new(metrics::MetricRegistry::new());
let write_buffer = if self.write_buffer {
let max = 1 << 32;
let segment = 1 << 16;
Some(Buffer::new(
max,
segment,
WriteBufferRollover::ReturnError,
false,
server_id,
))
} else {
None
};
let preserved_catalog = load_or_create_preserved_catalog(
db_name.as_str(),
Arc::clone(&object_store),
@ -92,7 +77,6 @@ impl TestDbBuilder {
server_id,
object_store,
exec,
write_buffer,
Arc::new(JobRegistry::new()),
preserved_catalog,
),
@ -114,11 +98,6 @@ impl TestDbBuilder {
self
}
pub fn write_buffer(mut self, enabled: bool) -> Self {
self.write_buffer = enabled;
self
}
pub fn worker_cleanup_avg_sleep(mut self, d: Duration) -> Self {
self.worker_cleanup_avg_sleep = Some(d);
self

View File

@ -204,16 +204,6 @@ async fn test_create_get_update_database() {
part: Some(partition_template::part::Part::Table(Empty {})),
}],
}),
write_buffer_config: Some(WriteBufferConfig {
buffer_size: 24,
segment_size: 2,
buffer_rollover: write_buffer_config::Rollover::DropIncoming as _,
persist_segments: true,
close_segment_after: Some(Duration {
seconds: 324,
nanos: 2,
}),
}),
lifecycle_rules: Some(LifecycleRules {
buffer_size_hard: 553,
sort_order: Some(lifecycle_rules::SortOrder {

View File

@ -1,22 +0,0 @@
[package]
name = "write_buffer"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"
[dependencies] # In alphabetical order
byteorder = "1.3.4"
crc32fast = "1.2.0"
futures = "0.3"
itertools = "0.9.0"
once_cell = { version = "1.4.0", features = ["parking_lot"] }
regex = "1.3.7"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.44"
snafu = "0.6.6"
snap = "1.0.0"
tokio = { version = "1.0", features=["macros", "fs"] }
observability_deps = { path = "../observability_deps" }
[dev-dependencies] # In alphabetical order
test_helpers = { path = "../test_helpers" }

View File

@ -1,749 +0,0 @@
#![deny(broken_intra_doc_links, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
clippy::explicit_iter_loop,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
//! # Write Buffer
//!
//! This crate provides a local-disk based Write Buffer tailored for
//! InfluxDB IOx `Partition`s.
//!
//! It is not currently connected to anything, but the intent is to
//! permit IOx running in standalone mode better durability.
//!
//! Work remaining:
//!
//! - More testing for correctness; the existing tests mostly demonstrate
//! possible usages.
//! - Error handling
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use crc32fast::Hasher;
use once_cell::sync::Lazy;
use regex::Regex;
use snafu::{ensure, ResultExt, Snafu};
use std::{
convert::TryFrom,
ffi::OsStr,
fs::{self, File, OpenOptions},
io::{self, ErrorKind, Read, Seek, SeekFrom, Write},
iter, mem, num,
path::{Path, PathBuf},
};
/// Write Buffer Writer and related utilties
pub mod writer;
/// Opaque public `Error` type
#[derive(Debug, Snafu)]
pub struct Error(InternalError);
/// SequenceNumber is a u64 monotonically increasing number for each Write
/// Buffer entry
pub type SequenceNumber = u64;
#[derive(Debug, Snafu)]
enum InternalError {
UnableToReadFileMetadata {
source: io::Error,
},
UnableToReadSequenceNumber {
source: io::Error,
},
UnableToReadChecksum {
source: io::Error,
},
UnableToReadLength {
source: io::Error,
},
UnableToReadData {
source: io::Error,
},
LengthMismatch {
expected: usize,
actual: usize,
},
ChecksumMismatch {
expected: u32,
actual: u32,
},
ChunkSizeTooLarge {
source: num::TryFromIntError,
actual: usize,
},
UnableToWriteSequenceNumber {
source: io::Error,
},
UnableToWriteChecksum {
source: io::Error,
},
UnableToWriteLength {
source: io::Error,
},
UnableToWriteData {
source: io::Error,
},
UnableToCompressData {
source: snap::Error,
},
UnableToDecompressData {
source: snap::Error,
},
UnableToSync {
source: io::Error,
},
UnableToOpenFile {
source: io::Error,
path: PathBuf,
},
UnableToCreateFile {
source: io::Error,
path: PathBuf,
},
UnableToCopyFileContents {
source: io::Error,
src: PathBuf,
dst: PathBuf,
},
UnableToReadDirectoryContents {
source: io::Error,
path: PathBuf,
},
}
/// A specialized `Result` for Write Buffer-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Build a Write Buffer rooted at a directory.
///
/// May take more configuration options in the future.
#[derive(Debug, Clone)]
pub struct WriteBufferBuilder {
root: PathBuf,
file_rollover_size: u64,
}
impl WriteBufferBuilder {
/// The default size to create new Write Buffer files at. Currently 10MiB.
///
/// See [WriteBufferBuilder::file_rollover_size]
pub const DEFAULT_FILE_ROLLOVER_SIZE_BYTES: u64 = 10 * 1024 * 1024;
/// Create a new Write Buffer rooted at the provided directory on disk.
pub fn new(root: impl Into<PathBuf>) -> Self {
// TODO: Error if `root` is not a directory?
let root = root.into();
Self {
root,
file_rollover_size: Self::DEFAULT_FILE_ROLLOVER_SIZE_BYTES,
}
}
/// Set the size (in bytes) of each Write Buffer file that should prompt
/// a file rollover when it is exceeded.
///
/// File rollover happens per sync batch. If the file is underneath this
/// file size limit at the start of a sync operation, the entire sync
/// batch will be written to that file even if some of the entries in
/// the batch cause the file to exceed the file size limit.
///
/// See [WriteBufferBuilder::DEFAULT_FILE_ROLLOVER_SIZE_BYTES]
pub fn file_rollover_size(mut self, file_rollover_size: u64) -> Self {
self.file_rollover_size = file_rollover_size;
self
}
/// Consume the builder and create a `WriteBuffer`.
///
/// # Asynchronous considerations
///
/// This method performs blocking IO and care should be taken when using
/// it in an asynchronous context.
pub fn write_buffer(self) -> Result<WriteBuffer> {
let rollover_size = self.file_rollover_size;
WriteBuffer::new(self.file_locator(), rollover_size)
}
/// Consume the builder to get an iterator of all entries in this
/// Write Buffer that have been persisted to disk.
///
/// Sequence numbers on the entries will be in increasing order, but if
/// files have been modified or deleted since getting this iterator,
/// there may be gaps in the sequence.
///
/// # Asynchronous considerations
///
/// This method performs blocking IO and care should be taken when using
/// it in an asynchronous context.
pub fn entries(self) -> Result<impl Iterator<Item = Result<Entry>>> {
Loader::load(self.file_locator())
}
fn file_locator(self) -> FileLocator {
FileLocator {
root: self.root,
file_rollover_size: self.file_rollover_size,
}
}
}
/// The main Write Buffer type to interact with.
///
/// For use in single-threaded synchronous contexts. For multi-threading or
/// asynchronous, you should wrap the Write Buffer in the appropriate patterns.
///
/// # Example
///
/// This demonstrates using the Write Buffer with the Tokio asynchronous
/// runtime.
///
/// ```
/// # fn example(root_path: &std::path::Path) -> Result<(), Box<dyn std::error::Error>> {
/// use write_buffer::{WriteBufferBuilder, WritePayload};
///
/// // This Write Buffer should be either protected with a mutex or moved into a single
/// // worker thread that receives writes from channels.
/// let mut write_buffer = WriteBufferBuilder::new(root_path).write_buffer()?;
///
/// // Now create a payload and append it
/// let payload = WritePayload::new(Vec::from("some data"))?;
///
/// // append will create a new Write Buffer entry with its own sequence number,
/// // which is returned
/// let sequence_number = write_buffer.append(payload)?;
///
/// // after appends, call sync_all to fsync the underlying Write Buffer file
/// write_buffer.sync_all()?;
///
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
pub struct WriteBuffer {
files: FileLocator,
sequence_number: u64,
total_size: u64,
active_file: Option<File>,
file_rollover_size: u64,
}
impl WriteBuffer {
fn new(files: FileLocator, file_rollover_size: u64) -> Result<Self> {
let last_sequence_number = Loader::last_sequence_number(&files)?;
let sequence_number = last_sequence_number.map_or(0, |last| last + 1);
let total_size = files.total_size();
Ok(Self {
files,
sequence_number,
total_size,
file_rollover_size,
active_file: None,
})
}
/// A path to a file for storing arbitrary metadata about this Write Buffer,
/// guaranteed not to collide with the data files.
pub fn metadata_path(&self) -> PathBuf {
self.files.root.join("metadata")
}
/// Appends a WritePayload to the active segment file in the Write Buffer
/// and returns its assigned sequence number.
///
/// To ensure the data is written to disk, `sync_all` should be called after
/// a single or batch of append operations.
pub fn append(&mut self, payload: WritePayload) -> Result<SequenceNumber> {
let sequence_number = self.sequence_number;
let mut f = match self.active_file.take() {
Some(f) => f,
None => self.files.open_file_for_append(sequence_number)?,
};
let h = Header {
sequence_number,
checksum: payload.checksum,
len: payload.len,
};
h.write(&mut f)?;
f.write_all(&payload.data).context(UnableToWriteData)?;
self.total_size += Header::LEN + payload.len as u64;
self.active_file = Some(f);
self.sequence_number += 1;
Ok(sequence_number)
}
/// Total size, in bytes, of all the data in all the files in the Write
/// Buffer. If files are deleted from disk without deleting them through
/// the Write Buffer, the size won't reflect that deletion until the
/// Write Buffer is recreated.
pub fn total_size(&self) -> u64 {
self.total_size
}
/// Deletes files up to, but not including, the file that contains the entry
/// number specified
pub fn delete_up_to_entry(&self, entry_number: u64) -> Result<()> {
let mut iter = self.files.existing_filenames()?.peekable();
let hypothetical_filename = self
.files
.filename_starting_at_sequence_number(entry_number);
while let Some(inner_path) = iter.next() {
if iter.peek().map_or(false, |p| p < &hypothetical_filename) {
// Intentionally ignore failures. Should we collect them for reporting instead?
let _ = fs::remove_file(inner_path);
} else {
break;
}
}
Ok(())
}
/// Flush all pending bytes in the active segment file to disk and closes it
/// if it is over the file rollover size.
pub fn sync_all(&mut self) -> Result<()> {
let f = self.active_file.take();
if let Some(f) = f {
f.sync_all().context(UnableToSync)?;
let meta = f.metadata().context(UnableToReadFileMetadata)?;
if meta.len() < self.file_rollover_size {
self.active_file = Some(f);
}
}
Ok(())
}
}
// Manages files within the Write Buffer directory
#[derive(Debug)]
struct FileLocator {
root: PathBuf,
file_rollover_size: u64,
}
impl FileLocator {
const PREFIX: &'static str = "wb_";
const EXTENSION: &'static str = "db";
fn open_files_for_read(&self) -> Result<impl Iterator<Item = Result<Option<File>>> + '_> {
Ok(self
.existing_filenames()?
.map(move |path| self.open_file_for_read(&path)))
}
fn total_size(&self) -> u64 {
self.existing_filenames()
.map(|files| {
files
.map(|file| {
fs::metadata(file)
.map(|metadata| metadata.len())
.unwrap_or(0)
})
.sum()
})
.unwrap_or(0)
}
fn open_file_for_read(&self, path: &Path) -> Result<Option<File>> {
let r = OpenOptions::new()
.read(true)
.write(false)
.create(false)
.open(&path);
match r {
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
r => r
.map(Some)
.context(UnableToOpenFile { path })
.map_err(Into::into),
}
}
fn open_file_for_append(&self, starting_sequence_number: u64) -> Result<File> {
// Is there an existing file?
let file_name = self
.active_filename()?
.filter(|existing| {
// If there is an existing file, check its size.
fs::metadata(&existing)
// Use the existing file if its size is under the file size limit.
.map(|metadata| metadata.len() < self.file_rollover_size)
.unwrap_or(false)
})
// If there is no file or the file is over the file size limit, start a new file.
.unwrap_or_else(|| self.filename_starting_at_sequence_number(starting_sequence_number));
Ok(OpenOptions::new()
.read(false)
.append(true)
.create(true)
.open(&file_name)
.context(UnableToOpenFile { path: file_name })?)
}
fn active_filename(&self) -> Result<Option<PathBuf>> {
Ok(self.existing_filenames()?.last())
}
fn existing_filenames(&self) -> Result<impl Iterator<Item = PathBuf>> {
static FILENAME_PATTERN: Lazy<Regex> = Lazy::new(|| {
let pattern = format!(r"^{}[0-9a-f]{{16}}$", FileLocator::PREFIX);
Regex::new(&pattern).expect("Hardcoded regex should be valid")
});
let mut write_buffer_paths: Vec<_> = fs::read_dir(&self.root)
.context(UnableToReadDirectoryContents { path: &self.root })?
.flatten() // Discard errors
.map(|e| e.path())
.filter(|path| path.extension() == Some(OsStr::new(Self::EXTENSION)))
.filter(|path| {
path.file_stem().map_or(false, |file_stem| {
let file_stem = file_stem.to_string_lossy();
FILENAME_PATTERN.is_match(&file_stem)
})
})
.collect();
write_buffer_paths.sort();
Ok(write_buffer_paths.into_iter())
}
fn filename_starting_at_sequence_number(&self, starting_sequence_number: u64) -> PathBuf {
let file_stem = format!("{}{:016x}", Self::PREFIX, starting_sequence_number);
let mut filename = self.root.join(file_stem);
filename.set_extension(Self::EXTENSION);
filename
}
}
/// Produces an iterator over the on-disk entries in the Write Buffer.
///
/// # Asynchronous considerations
///
/// This type performs blocking IO and care should be taken when using
/// it in an asynchronous context.
#[derive(Debug)]
struct Loader;
impl Loader {
fn last_sequence_number(files: &FileLocator) -> Result<Option<u64>> {
let last = Self::headers(files)?.last().transpose()?;
Ok(last.map(|h| h.sequence_number))
}
fn headers(files: &FileLocator) -> Result<impl Iterator<Item = Result<Header>>> {
let r = files
.open_files_for_read()?
.flat_map(|result_option_file| result_option_file.transpose())
.map(|result_file| result_file.and_then(Self::headers_from_one_file));
itertools::process_results(r, |iterator_of_iterators_of_result_headers| {
iterator_of_iterators_of_result_headers
.flatten()
.collect::<Vec<_>>()
.into_iter()
})
}
fn headers_from_one_file(mut file: File) -> Result<impl Iterator<Item = Result<Header>>> {
let metadata = file.metadata().context(UnableToReadFileMetadata)?;
let mut length_remaining = metadata.len();
Ok(Box::new(iter::from_fn(move || {
if length_remaining == 0 {
return None;
}
match Header::read(&mut file) {
Ok(header) => {
let data_len = i64::from(header.len);
file.seek(SeekFrom::Current(data_len)).unwrap();
length_remaining -= Header::LEN + u64::from(header.len);
Some(Ok(header))
}
Err(e) => Some(Err(e)),
}
})))
}
fn load(files: FileLocator) -> Result<impl Iterator<Item = Result<Entry>>> {
let r = files
.open_files_for_read()?
.flat_map(|result_option_file| result_option_file.transpose())
.map(|result_file| result_file.and_then(Self::load_from_one_file));
itertools::process_results(r, |iterator_of_iterators_of_result_entries| {
iterator_of_iterators_of_result_entries
.flatten()
.collect::<Vec<_>>()
.into_iter()
})
}
fn load_from_one_file(mut file: File) -> Result<impl Iterator<Item = Result<Entry>>> {
let metadata = file.metadata().context(UnableToReadFileMetadata)?;
let mut length_remaining = metadata.len();
Ok(Box::new(iter::from_fn(move || {
if length_remaining == 0 {
return None;
}
match Self::load_one(&mut file) {
Ok((entry, bytes_read)) => {
length_remaining -= bytes_read;
Some(Ok(entry))
}
Err(e) => Some(Err(e)),
}
})))
}
fn load_one(file: &mut File) -> Result<(Entry, u64)> {
let header = Header::read(&mut *file)?;
let expected_len_us =
usize::try_from(header.len).expect("Only designed to run on 32-bit systems or higher");
let mut compressed_data = Vec::with_capacity(expected_len_us);
let actual_compressed_len = file
.take(u64::from(header.len))
.read_to_end(&mut compressed_data)
.context(UnableToReadData)?;
ensure!(
expected_len_us == actual_compressed_len,
LengthMismatch {
expected: expected_len_us,
actual: actual_compressed_len
}
);
let mut hasher = Hasher::new();
hasher.update(&compressed_data);
let actual_checksum = hasher.finalize();
ensure!(
header.checksum == actual_checksum,
ChecksumMismatch {
expected: header.checksum,
actual: actual_checksum
}
);
let mut decoder = snap::raw::Decoder::new();
let data = decoder
.decompress_vec(&compressed_data)
.context(UnableToDecompressData)?;
let entry = Entry {
sequence_number: header.sequence_number,
data,
};
let bytes_read = Header::LEN + u64::from(header.len);
Ok((entry, bytes_read))
}
}
#[derive(Debug)]
struct Header {
sequence_number: u64,
checksum: u32,
len: u32,
}
impl Header {
const LEN: u64 = (mem::size_of::<u64>() + mem::size_of::<u32>() + mem::size_of::<u32>()) as u64;
fn read(mut r: impl Read) -> Result<Self> {
let sequence_number = r
.read_u64::<LittleEndian>()
.context(UnableToReadSequenceNumber)?;
let checksum = r.read_u32::<LittleEndian>().context(UnableToReadChecksum)?;
let len = r.read_u32::<LittleEndian>().context(UnableToReadLength)?;
Ok(Self {
sequence_number,
checksum,
len,
})
}
fn write(&self, mut w: impl Write) -> Result<()> {
w.write_u64::<LittleEndian>(self.sequence_number)
.context(UnableToWriteSequenceNumber)?;
w.write_u32::<LittleEndian>(self.checksum)
.context(UnableToWriteChecksum)?;
w.write_u32::<LittleEndian>(self.len)
.context(UnableToWriteLength)?;
Ok(())
}
}
/// One batch of data read from the Write Buffer.
///
/// This corresponds to one call to `WriteBuffer::append`.
#[derive(Debug, Clone)]
pub struct Entry {
sequence_number: u64,
data: Vec<u8>,
}
impl Entry {
/// Gets the unique, increasing sequence number associated with this data
pub fn sequence_number(&self) -> u64 {
self.sequence_number
}
/// Gets a reference to the entry's data
pub fn as_data(&self) -> &[u8] {
&self.data
}
/// Gets the entry's data
pub fn into_data(self) -> Vec<u8> {
self.data
}
}
/// A single write to append to the Write Buffer file
#[derive(Debug)]
pub struct WritePayload {
checksum: u32,
data: Vec<u8>,
len: u32,
}
impl WritePayload {
/// Initializes a write payload, compresses the data, and computes its CRC.
pub fn new(uncompressed_data: Vec<u8>) -> Result<Self> {
// Only designed to support chunks up to `u32::max` bytes long.
let uncompressed_len = uncompressed_data.len();
let _ = u32::try_from(uncompressed_len).context(ChunkSizeTooLarge {
actual: uncompressed_len,
})?;
let mut encoder = snap::raw::Encoder::new();
let compressed_data = encoder
.compress_vec(&uncompressed_data)
.context(UnableToCompressData)?;
let actual_compressed_len = compressed_data.len();
let actual_compressed_len =
u32::try_from(actual_compressed_len).context(ChunkSizeTooLarge {
actual: actual_compressed_len,
})?;
let mut hasher = Hasher::new();
hasher.update(&compressed_data);
let checksum = hasher.finalize();
Ok(Self {
checksum,
data: compressed_data,
len: actual_compressed_len,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sequence_numbers_are_persisted() {
let dir = test_helpers::tmp_dir().unwrap();
let builder = WriteBufferBuilder::new(dir.as_ref());
let mut write_buffer;
// Create one in-memory Write Buffer and sync it
{
write_buffer = builder.clone().write_buffer().unwrap();
let data = Vec::from("somedata");
let data = WritePayload::new(data).unwrap();
let seq = write_buffer.append(data).unwrap();
assert_eq!(0, seq);
write_buffer.sync_all().unwrap();
}
// Pretend the process restarts
{
write_buffer = builder.write_buffer().unwrap();
assert_eq!(1, write_buffer.sequence_number);
}
}
#[test]
fn sequence_numbers_increase_by_number_of_pending_entries() {
let dir = test_helpers::tmp_dir().unwrap();
let builder = WriteBufferBuilder::new(dir.as_ref());
let mut write_buffer = builder.write_buffer().unwrap();
// Write 1 entry then sync
let data = Vec::from("some");
let data = WritePayload::new(data).unwrap();
let seq = write_buffer.append(data).unwrap();
write_buffer.sync_all().unwrap();
assert_eq!(0, seq);
// Sequence number should increase by 1
assert_eq!(1, write_buffer.sequence_number);
// Write 2 entries then sync
let data = Vec::from("other");
let data = WritePayload::new(data).unwrap();
let seq = write_buffer.append(data).unwrap();
assert_eq!(1, seq);
let data = Vec::from("again");
let data = WritePayload::new(data).unwrap();
let seq = write_buffer.append(data).unwrap();
assert_eq!(2, seq);
write_buffer.sync_all().unwrap();
// Sequence number should increase by 2
assert_eq!(3, write_buffer.sequence_number);
}
}

View File

@ -1,166 +0,0 @@
#![warn(
missing_copy_implementations,
missing_debug_implementations,
clippy::explicit_iter_loop,
clippy::use_self
)]
use crate::{Error as WriteBufferError, SequenceNumber, WriteBufferBuilder, WritePayload};
use futures::{channel::mpsc, SinkExt, StreamExt};
use snafu::{ResultExt, Snafu};
use observability_deps::tracing::{error, info};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
#[derive(Debug, Snafu)]
/// Error type
pub enum Error {
#[snafu(display("Write Buffer Writer error using Write Buffer: {}", source))]
UnderlyingWriteBufferError { source: WriteBufferError },
#[snafu(display("Error serializing metadata: {}", source))]
SerializeMetadata { source: serde_json::error::Error },
#[snafu(display("Error writing to Write Buffer: {}", source))]
WrtitingToWriteBuffer { source: std::io::Error },
#[snafu(display("Error writing metadata to '{:?}': {}", metadata_path, source))]
WritingMetadata {
metadata_path: PathBuf,
source: std::io::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct WriteBufferDetails {
pub metadata_path: PathBuf,
pub metadata: WriteBufferMetadata,
pub write_tx: mpsc::Sender<WriteBufferWrite>,
}
#[derive(Debug)]
pub struct WriteBufferWrite {
payload: WritePayload,
notify_tx: mpsc::Sender<Result<SequenceNumber, WriteBufferError>>,
}
impl WriteBufferDetails {
pub async fn write_metadata(&self) -> Result<()> {
Ok(tokio::fs::write(
self.metadata_path.clone(),
serde_json::to_string(&self.metadata).context(SerializeMetadata)?,
)
.await
.context(WritingMetadata {
metadata_path: &self.metadata_path,
})?)
}
pub async fn write_and_sync(&self, data: Vec<u8>) -> Result<()> {
let payload = WritePayload::new(data).context(UnderlyingWriteBufferError {})?;
let (notify_tx, mut notify_rx) = mpsc::channel(1);
let write = WriteBufferWrite { payload, notify_tx };
let mut tx = self.write_tx.clone();
tx.send(write)
.await
.expect("The Write Buffer thread should always be running to receive a write");
let _ = notify_rx
.next()
.await
.expect("The Write Buffer thread should always be running to send a response.")
.context(UnderlyingWriteBufferError {})?;
Ok(())
}
}
/// Metadata about this particular Write Buffer
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)]
pub struct WriteBufferMetadata {
pub format: WriteBufferFormat,
}
impl Default for WriteBufferMetadata {
fn default() -> Self {
Self {
format: WriteBufferFormat::FlatBuffers,
}
}
}
/// Supported WriteBuffer formats that can be restored
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)]
pub enum WriteBufferFormat {
FlatBuffers,
#[serde(other)]
Unknown,
}
pub async fn start_write_buffer_sync_task(
write_buffer_builder: WriteBufferBuilder,
) -> Result<WriteBufferDetails> {
let mut write_buffer = write_buffer_builder
.write_buffer()
.context(UnderlyingWriteBufferError)?;
let metadata = tokio::fs::read_to_string(write_buffer.metadata_path())
.await
.and_then(|raw_metadata| {
serde_json::from_str::<WriteBufferMetadata>(&raw_metadata).map_err(Into::into)
})
.unwrap_or_default();
let metadata_path = write_buffer.metadata_path();
let (write_tx, mut write_rx) = mpsc::channel::<WriteBufferWrite>(100);
tokio::spawn({
async move {
loop {
match write_rx.next().await {
Some(write) => {
let payload = write.payload;
let mut tx = write.notify_tx;
let result = write_buffer.append(payload).and_then(|seq| {
write_buffer.sync_all()?;
Ok(seq)
});
if let Err(e) = tx.send(result).await {
error!("error sending result back to writer {:?}", e);
}
}
None => {
info!(
"shutting down Write Buffer for {:?}",
write_buffer.metadata_path()
);
return;
}
}
}
}
});
Ok(WriteBufferDetails {
metadata_path,
metadata,
write_tx,
})
}
#[cfg(test)]
mod tests {
#[test]
fn it_works_but_has_no_tests() {
// :thinking_face:
}
}

View File

@ -1,100 +0,0 @@
use write_buffer::{WriteBufferBuilder, WritePayload};
#[macro_use]
mod helpers;
use crate::helpers::*;
#[test]
#[allow(clippy::cognitive_complexity)]
fn delete_up_to() -> Result {
let dir = test_helpers::tmp_dir()?;
// Set the file rollover size limit low to test interaction with file rollover
let builder = WriteBufferBuilder::new(dir.as_ref()).file_rollover_size(100);
let mut write_buffer = builder.clone().write_buffer()?;
create_and_sync_batch!(
write_buffer,
[
"some data within the file limit",
"some more data that puts the file over the limit"
]
);
// Write one Write Buffer entry, and because the existing file is over the size
// limit, this entry should end up in a new Write Buffer file
create_and_sync_batch!(
write_buffer,
["some more data, this should now be rolled over into the next Write Buffer file"]
);
// Write two Write Buffer entries, one that could fit in the existing file but
// puts the file over the limit. Because the two entries are in one sync
// batch, they both will end up in the existing file even though it's over
// the limit after the first entry.
create_and_sync_batch!(
write_buffer,
[
"one entry that puts the existing file over the limit",
"another entry"
]
);
// There should be two existing Write Buffer files
assert_filenames_for_sequence_numbers!(dir, [0, 2]);
// Should be able to read the entries back out
let write_buffer_entries = all_entries(&builder)?;
assert_eq!(5, write_buffer_entries.len());
assert_entry!(
write_buffer_entries[0],
0,
b"some data within the file limit"
);
assert_entry!(
write_buffer_entries[1],
1,
b"some more data that puts the file over the limit"
);
assert_entry!(
write_buffer_entries[2],
2,
b"some more data, this should now be rolled over into the next Write Buffer file"
);
assert_entry!(
write_buffer_entries[3],
3,
b"one entry that puts the existing file over the limit"
);
assert_entry!(write_buffer_entries[4], 4, b"another entry");
// Not including 3!
write_buffer.delete_up_to_entry(3)?;
// There should be one existing Write Buffer file
assert_filenames_for_sequence_numbers!(dir, [2]);
// Add another entry; the sequence numbers continue
create_and_sync_batch!(write_buffer, ["entry after deletion"]);
// Should be able to read the entries back out
let write_buffer_entries = all_entries(&builder)?;
assert_eq!(4, write_buffer_entries.len());
// 2 is still readable, because we asked to delete it but couldn't because it
// was in a file with 3.
assert_entry!(
write_buffer_entries[0],
2,
b"some more data, this should now be rolled over into the next Write Buffer file"
);
assert_entry!(
write_buffer_entries[1],
3,
b"one entry that puts the existing file over the limit"
);
assert_entry!(write_buffer_entries[2], 4, b"another entry");
assert_entry!(write_buffer_entries[3], 5, b"entry after deletion");
Ok(())
}

View File

@ -1,154 +0,0 @@
use std::fs;
use write_buffer::{WriteBufferBuilder, WritePayload};
#[macro_use]
mod helpers;
use crate::helpers::*;
#[test]
#[allow(clippy::cognitive_complexity)]
fn file_rollover() -> Result {
let dir = test_helpers::tmp_dir()?;
// Set the file rollover size limit low to test rollover
let builder = WriteBufferBuilder::new(dir.as_ref()).file_rollover_size(100);
let mut write_buffer = builder.clone().write_buffer()?;
// Should start without existing Write Buffer files
let write_buffer_files = write_buffer_file_names(&dir.as_ref());
assert!(write_buffer_files.is_empty());
// Reading the Write Buffer should return Ok(empty vec)
let write_buffer_entries = all_entries(&builder)?;
assert!(write_buffer_entries.is_empty());
// Write one Write Buffer entry when there are no existing Write Buffer files
create_and_sync_batch!(write_buffer, ["some data within the file limit"]);
// There should now be one existing Write Buffer file
assert_filenames_for_sequence_numbers!(dir, [0]);
// Should be able to read the entry back out
let write_buffer_entries = all_entries(&builder)?;
assert_eq!(1, write_buffer_entries.len());
assert_entry!(
write_buffer_entries[0],
0,
b"some data within the file limit"
);
// Write one Write Buffer entry when there is an existing Write Buffer file
// that is currently under the size limit, should end up in the same Write
// Buffer file
create_and_sync_batch!(
write_buffer,
["some more data that puts the file over the limit"]
);
// There should still be one existing Write Buffer file
assert_filenames_for_sequence_numbers!(dir, [0]);
// Should be able to read the entries back out
let write_buffer_entries = all_entries(&builder)?;
assert_eq!(2, write_buffer_entries.len());
assert_entry!(
write_buffer_entries[0],
0,
b"some data within the file limit"
);
assert_entry!(
write_buffer_entries[1],
1,
b"some more data that puts the file over the limit",
);
// Write one Write Buffer entry, and because the existing file is over the size
// limit, this entry should end up in a new Write Buffer file
create_and_sync_batch!(
write_buffer,
["some more data, this should now be rolled over into the next Write Buffer file"]
);
// There should now be two existing Write Buffer files
assert_filenames_for_sequence_numbers!(dir, [0, 2]);
// Should be able to read the entries back out
let write_buffer_entries = all_entries(&builder)?;
assert_eq!(3, write_buffer_entries.len());
assert_entry!(
write_buffer_entries[0],
0,
b"some data within the file limit"
);
assert_entry!(
write_buffer_entries[1],
1,
b"some more data that puts the file over the limit"
);
assert_entry!(
write_buffer_entries[2],
2,
b"some more data, this should now be rolled over into the next Write Buffer file"
);
// Write two Write Buffer entries, one that could fit in the existing file but
// puts the file over the limit. Because the two entries are in one sync
// batch, they both will end up in the existing file even though it's over
// the limit after the first entry.
create_and_sync_batch!(
write_buffer,
[
"one entry that puts the existing file over the limit",
"another entry"
]
);
// There should still be two existing Write Buffer files
assert_filenames_for_sequence_numbers!(dir, [0, 2]);
// Should be able to read the entries back out
let write_buffer_entries = all_entries(&builder)?;
assert_eq!(5, write_buffer_entries.len());
assert_entry!(
write_buffer_entries[0],
0,
b"some data within the file limit"
);
assert_entry!(
write_buffer_entries[1],
1,
b"some more data that puts the file over the limit"
);
assert_entry!(
write_buffer_entries[2],
2,
b"some more data, this should now be rolled over into the next Write Buffer file"
);
assert_entry!(
write_buffer_entries[3],
3,
b"one entry that puts the existing file over the limit"
);
assert_entry!(write_buffer_entries[4], 4, b"another entry");
// Some process deletes the first Write Buffer file
let path = dir.path().join(file_name_for_sequence_number(0));
fs::remove_file(path)?;
// Should be able to read the remaining entries back out
let write_buffer_entries = all_entries(&builder)?;
assert_eq!(3, write_buffer_entries.len());
assert_entry!(
write_buffer_entries[0],
2,
b"some more data, this should now be rolled over into the next Write Buffer file"
);
assert_entry!(
write_buffer_entries[1],
3,
b"one entry that puts the existing file over the limit"
);
assert_entry!(write_buffer_entries[2], 4, b"another entry");
Ok(())
}

View File

@ -1,75 +0,0 @@
#![allow(unused_macros)]
#![allow(dead_code)]
use std::{fs, path::PathBuf};
use write_buffer::{Entry, WriteBufferBuilder};
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub type Result<T = (), E = TestError> = std::result::Result<T, E>;
pub fn write_buffer_file_names(dir: impl Into<PathBuf>) -> Vec<String> {
write_buffer_paths(dir)
.iter()
.filter_map(|path| path.file_name().map(|p| p.to_string_lossy().to_string()))
.collect()
}
pub fn write_buffer_paths(dir: impl Into<PathBuf>) -> Vec<PathBuf> {
let mut paths: Vec<_> = fs::read_dir(&dir.into())
.expect("Cannot read Write Buffer directory")
.flatten() // Ignore errors
.map(|entry| entry.path())
.collect();
paths.sort();
paths
}
pub fn total_size_on_disk(dir: impl Into<PathBuf>) -> u64 {
write_buffer_paths(&dir.into())
.iter()
.map(|file| {
fs::metadata(file)
.expect("Could not read file metadata")
.len()
})
.sum()
}
pub fn file_name_for_sequence_number(id: u64) -> String {
format!("wb_{:016x}.db", id)
}
pub fn all_entries(builder: &WriteBufferBuilder) -> Result<Vec<Entry>> {
builder
.clone()
.entries()?
.collect::<Result<Vec<_>, _>>()
.map_err(Into::into)
}
macro_rules! assert_filenames_for_sequence_numbers {
($dir:expr, [$($id:expr),* $(,)?] $(,)?) => {{
let actual = write_buffer_file_names(&$dir.as_ref());
let expected = [$(file_name_for_sequence_number($id)),*];
assert_eq!(actual, expected);
}};
}
macro_rules! assert_entry {
($entry:expr, $seq_num:expr, $data: expr $(,)?) => {{
assert_eq!($seq_num, $entry.sequence_number());
assert_eq!($data.as_ref(), $entry.as_data());
}};
}
macro_rules! create_and_sync_batch {
($write_buffer:expr, [$($entry:expr),* $(,)?] $(,)?) => {{
$({
let data = Vec::from($entry);
let data = WritePayload::new(data)?;
$write_buffer.append(data)?;
})*
$write_buffer.sync_all()?;
}};
}

View File

@ -1,21 +0,0 @@
use write_buffer::{WriteBufferBuilder, WritePayload};
#[test]
fn no_concurrency() {
let dir = test_helpers::tmp_dir().unwrap();
let builder = WriteBufferBuilder::new(dir.as_ref());
let mut write_buffer = builder.clone().write_buffer().unwrap();
let data = Vec::from("somedata");
let payload = WritePayload::new(data).unwrap();
let sequence_number = write_buffer.append(payload).unwrap();
write_buffer.sync_all().unwrap();
assert_eq!(0, sequence_number);
let write_buffer_entries: Result<Vec<_>, _> = builder.entries().unwrap().collect();
let write_buffer_entries = write_buffer_entries.unwrap();
assert_eq!(1, write_buffer_entries.len());
assert_eq!(b"somedata".as_ref(), write_buffer_entries[0].as_data());
assert_eq!(0, write_buffer_entries[0].sequence_number());
}

View File

@ -1,79 +0,0 @@
use std::fs;
use write_buffer::{WriteBufferBuilder, WritePayload};
#[macro_use]
mod helpers;
use helpers::Result;
#[test]
#[allow(clippy::cognitive_complexity)]
fn total_size() -> Result {
let dir = test_helpers::tmp_dir()?;
// Set the file rollover size limit low to test how rollover interacts with
// total size
let builder = WriteBufferBuilder::new(dir.as_ref()).file_rollover_size(100);
let mut write_buffer = builder.clone().write_buffer()?;
// Should start without existing Write Buffer files; this implies total file
// size on disk is 0
let write_buffer_files = helpers::write_buffer_file_names(&dir.as_ref());
assert!(write_buffer_files.is_empty());
// Total size should be 0
assert_eq!(write_buffer.total_size(), 0);
create_and_sync_batch!(write_buffer, ["some data within the file limit"]);
// Total size should be that of all the files
assert_eq!(
write_buffer.total_size(),
helpers::total_size_on_disk(&dir.as_ref())
);
// Write one Write Buffer entry that ends up in the same Write Buffer file
create_and_sync_batch!(
write_buffer,
["some more data that puts the file over the limit"]
);
// Total size should be that of all the files
assert_eq!(
write_buffer.total_size(),
helpers::total_size_on_disk(&dir.as_ref())
);
// Write one Write Buffer entry, and because the existing file is over the size
// limit, this entry should end up in a new Write Buffer file
create_and_sync_batch!(
write_buffer,
["some more data, this should now be rolled over into the next Write Buffer file"]
);
// Total size should be that of all the files
assert_eq!(
write_buffer.total_size(),
helpers::total_size_on_disk(&dir.as_ref())
);
let total_file_size_before_delete = helpers::total_size_on_disk(&dir.as_ref());
// Some process deletes the first Write Buffer file
let path = dir.path().join(helpers::file_name_for_sequence_number(0));
fs::remove_file(path)?;
// Total size isn't aware of the out-of-band deletion
assert_eq!(write_buffer.total_size(), total_file_size_before_delete);
// Pretend the process restarts
let write_buffer = builder.write_buffer()?;
// Total size should be that of all the files, so without the file deleted
// out-of-band
assert_eq!(
write_buffer.total_size(),
helpers::total_size_on_disk(&dir.as_ref())
);
Ok(())
}