feat: Segment the write buffer on time (#24745)

* Split WriteBuffer into segments

* Add SegmentRange and SegmentDuration
* Update WAL to store SegmentRange and to be able to open up multiple ranges
* Remove Partitioner and PartitionBuffer

* Update SegmentState and loader

* Update SegmentState with current, next and outside
* Update loader and tests to load up current, next and previous outside segments based on the passed in time and desired segment duration

* Update WriteBufferImpl and Flusher

* Update the flusher to flush to multiple segments
* Update WriteBufferImpl to split data into segments getting written to
* Update HTTP and WriteBuffer to use TimeProvider

* Wire up outside segment writes and loading

* Data outside current and next no longer go to a single segment, but to a segment based on that data's time. Limits to 100 segments of time that can be written to at any given time.

* Refactor SegmentDuration add config option

* Refactors SegmentDuration to be a new type over duration
* Adds the clap block configuration to pass SegmentDuration, defaulting to 1h

* refactor: SegmentState and loader

* remove the current_segment and next_segment from the loader and segment state, instead having just a collection of segments
* open up only the current_segment by default
* keep current and next segments open if they exist, while others go into persisting or persisted

* fix: cargo audit

* refactor: fixup PR feedback
pull/24747/head
Paul Dix 2024-03-11 13:54:09 -04:00 committed by GitHub
parent c4d651fbd1
commit bf931970d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 1288 additions and 712 deletions

35
Cargo.lock generated
View File

@ -751,9 +751,9 @@ dependencies = [
[[package]]
name = "bumpalo"
version = "3.15.3"
version = "3.15.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea184aa71bb362a1157c896979544cc23974e08fd265f29ea96b59f0b4a555b"
checksum = "7ff69b9dd49fd426c69a0db9fc04dd934cdb6645ff000864d98f7e2af8830eaa"
[[package]]
name = "bytecount"
@ -866,9 +866,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.0.89"
version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0ba8f7aaa012f30d5b2861462f6708eccd49c3c39863fe083a308035f63d723"
checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5"
dependencies = [
"jobserver",
"libc",
@ -925,9 +925,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.5.1"
version = "4.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c918d541ef2913577a0f9566e9ce27cb35b6df072075769e0b26cb5a554520da"
checksum = "b230ab84b0ffdf890d5a10abdbc8b83ae1c4918275daea1ab8801f71536b2651"
dependencies = [
"clap_builder",
"clap_derive",
@ -962,9 +962,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.5.1"
version = "4.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f3e7391dad68afb0c2ede1bf619f579a3dc9c2ec67f089baa397123a2f3d1eb"
checksum = "ae129e2e766ae0ec03484e609954119f123cc1fe650337e155d03b022f24f7b4"
dependencies = [
"anstream",
"anstyle",
@ -2611,6 +2611,7 @@ dependencies = [
"influxdb-line-protocol",
"iox_catalog",
"iox_query",
"iox_time",
"object_store",
"observability_deps",
"parking_lot",
@ -4627,9 +4628,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
name = "reqwest"
version = "0.11.24"
version = "0.11.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251"
checksum = "0eea5a9eb898d3783f17c6407670e3592fd174cb81a10e51d4c37f49450b9946"
dependencies = [
"base64 0.21.7",
"bytes",
@ -5662,9 +5663,9 @@ checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "sysinfo"
version = "0.30.6"
version = "0.30.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6746919caf9f2a85bff759535664c060109f21975c5ac2e8652e60102bd4d196"
checksum = "0c385888ef380a852a16209afc8cfad22795dd8873d69c9a14d2e2088f118d18"
dependencies = [
"cfg-if",
"core-foundation-sys",
@ -5677,20 +5678,20 @@ dependencies = [
[[package]]
name = "system-configuration"
version = "0.5.1"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7"
checksum = "658bc6ee10a9b4fcf576e9b0819d95ec16f4d2c02d39fd83ac1c8789785c4a42"
dependencies = [
"bitflags 1.3.2",
"bitflags 2.4.2",
"core-foundation",
"system-configuration-sys",
]
[[package]]
name = "system-configuration-sys"
version = "0.5.0"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9"
checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4"
dependencies = [
"core-foundation-sys",
"libc",

View File

@ -14,7 +14,9 @@ use influxdb3_server::{
use influxdb3_write::persister::PersisterImpl;
use influxdb3_write::wal::WalImpl;
use influxdb3_write::write_buffer::WriteBufferImpl;
use influxdb3_write::SegmentDuration;
use iox_query::exec::{Executor, ExecutorConfig};
use iox_time::SystemProvider;
use ioxd_common::reexport::trace_http::ctx::TraceHeaderParser;
use object_store::DynObjectStore;
use observability_deps::tracing::*;
@ -133,6 +135,16 @@ pub struct Config {
/// bearer token to be set for requests
#[clap(long = "bearer-token", env = "INFLUXDB3_BEARER_TOKEN", action)]
pub bearer_token: Option<String>,
/// Duration of wal segments that are persisted to object storage. Valid values: 1m, 5m, 10m,
/// 15m, 30m, 1h, 2h, 4h.
#[clap(
long = "segment-duration",
env = "INFLUXDB3_SEGMENT_DURATION",
default_value = "1h",
action
)]
pub segment_duration: SegmentDuration,
}
#[cfg(all(not(feature = "heappy"), not(feature = "jemalloc_replacing_malloc")))]
@ -250,8 +262,17 @@ pub async fn command(config: Config) -> Result<()> {
.wal_directory
.map(|dir| WalImpl::new(dir).map(Arc::new))
.transpose()?;
// TODO: the next segment ID should be loaded from the persister
let write_buffer = Arc::new(WriteBufferImpl::new(Arc::clone(&persister), wal).await?);
let time_provider = Arc::new(SystemProvider::new());
let write_buffer = Arc::new(
WriteBufferImpl::new(
Arc::clone(&persister),
wal,
Arc::clone(&time_provider),
config.segment_duration,
)
.await?,
);
let query_executor = Arc::new(QueryExecutorImpl::new(
write_buffer.catalog(),
Arc::clone(&write_buffer),
@ -261,12 +282,11 @@ pub async fn command(config: Config) -> Result<()> {
10,
));
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let builder = ServerBuilder::new(common_state)
.max_request_size(config.max_http_request_size)
.write_buffer(write_buffer)
.query_executor(query_executor)
.time_provider(time_provider)
.persister(persister);
let server = if let Some(token) = config.bearer_token.map(hex::decode).transpose()? {

View File

@ -5,8 +5,9 @@ use authz::Authorizer;
use crate::{auth::DefaultAuthorizer, http::HttpApi, CommonServerState, Server};
#[derive(Debug)]
pub struct ServerBuilder<W, Q, P> {
pub struct ServerBuilder<W, Q, P, T> {
common_state: CommonServerState,
time_provider: T,
max_request_size: usize,
write_buffer: W,
query_executor: Q,
@ -14,10 +15,11 @@ pub struct ServerBuilder<W, Q, P> {
authorizer: Arc<dyn Authorizer>,
}
impl ServerBuilder<NoWriteBuf, NoQueryExec, NoPersister> {
impl ServerBuilder<NoWriteBuf, NoQueryExec, NoPersister, NoTimeProvider> {
pub fn new(common_state: CommonServerState) -> Self {
Self {
common_state,
time_provider: NoTimeProvider,
max_request_size: usize::MAX,
write_buffer: NoWriteBuf,
query_executor: NoQueryExec,
@ -27,7 +29,7 @@ impl ServerBuilder<NoWriteBuf, NoQueryExec, NoPersister> {
}
}
impl<W, Q, P> ServerBuilder<W, Q, P> {
impl<W, Q, P, T> ServerBuilder<W, Q, P, T> {
pub fn max_request_size(mut self, max_request_size: usize) -> Self {
self.max_request_size = max_request_size;
self
@ -51,11 +53,16 @@ pub struct WithQueryExec<Q>(Arc<Q>);
pub struct NoPersister;
#[derive(Debug)]
pub struct WithPersister<P>(Arc<P>);
#[derive(Debug)]
pub struct NoTimeProvider;
#[derive(Debug)]
pub struct WithTimeProvider<T>(Arc<T>);
impl<Q, P> ServerBuilder<NoWriteBuf, Q, P> {
pub fn write_buffer<W>(self, wb: Arc<W>) -> ServerBuilder<WithWriteBuf<W>, Q, P> {
impl<Q, P, T> ServerBuilder<NoWriteBuf, Q, P, T> {
pub fn write_buffer<W>(self, wb: Arc<W>) -> ServerBuilder<WithWriteBuf<W>, Q, P, T> {
ServerBuilder {
common_state: self.common_state,
time_provider: self.time_provider,
max_request_size: self.max_request_size,
write_buffer: WithWriteBuf(wb),
query_executor: self.query_executor,
@ -65,10 +72,11 @@ impl<Q, P> ServerBuilder<NoWriteBuf, Q, P> {
}
}
impl<W, P> ServerBuilder<W, NoQueryExec, P> {
pub fn query_executor<Q>(self, qe: Arc<Q>) -> ServerBuilder<W, WithQueryExec<Q>, P> {
impl<W, P, T> ServerBuilder<W, NoQueryExec, P, T> {
pub fn query_executor<Q>(self, qe: Arc<Q>) -> ServerBuilder<W, WithQueryExec<Q>, P, T> {
ServerBuilder {
common_state: self.common_state,
time_provider: self.time_provider,
max_request_size: self.max_request_size,
write_buffer: self.write_buffer,
query_executor: WithQueryExec(qe),
@ -78,10 +86,11 @@ impl<W, P> ServerBuilder<W, NoQueryExec, P> {
}
}
impl<W, Q> ServerBuilder<W, Q, NoPersister> {
pub fn persister<P>(self, p: Arc<P>) -> ServerBuilder<W, Q, WithPersister<P>> {
impl<W, Q, T> ServerBuilder<W, Q, NoPersister, T> {
pub fn persister<P>(self, p: Arc<P>) -> ServerBuilder<W, Q, WithPersister<P>, T> {
ServerBuilder {
common_state: self.common_state,
time_provider: self.time_provider,
max_request_size: self.max_request_size,
write_buffer: self.write_buffer,
query_executor: self.query_executor,
@ -91,12 +100,29 @@ impl<W, Q> ServerBuilder<W, Q, NoPersister> {
}
}
impl<W, Q, P> ServerBuilder<WithWriteBuf<W>, WithQueryExec<Q>, WithPersister<P>> {
pub fn build(self) -> Server<W, Q, P> {
impl<W, Q, P> ServerBuilder<W, Q, P, NoTimeProvider> {
pub fn time_provider<T>(self, tp: Arc<T>) -> ServerBuilder<W, Q, P, WithTimeProvider<T>> {
ServerBuilder {
common_state: self.common_state,
time_provider: WithTimeProvider(tp),
max_request_size: self.max_request_size,
write_buffer: self.write_buffer,
query_executor: self.query_executor,
persister: self.persister,
authorizer: self.authorizer,
}
}
}
impl<W, Q, P, T>
ServerBuilder<WithWriteBuf<W>, WithQueryExec<Q>, WithPersister<P>, WithTimeProvider<T>>
{
pub fn build(self) -> Server<W, Q, P, T> {
let persister = Arc::clone(&self.persister.0);
let authorizer = Arc::clone(&self.authorizer);
let http = Arc::new(HttpApi::new(
self.common_state.clone(),
Arc::clone(&self.time_provider.0),
Arc::clone(&self.write_buffer.0),
Arc::clone(&self.query_executor.0),
self.max_request_size,

View File

@ -24,7 +24,7 @@ use influxdb3_write::BufferedWriteRequest;
use influxdb3_write::Precision;
use influxdb3_write::WriteBuffer;
use iox_query_influxql_rewrite as rewrite;
use iox_time::{SystemProvider, TimeProvider};
use iox_time::TimeProvider;
use observability_deps::tracing::{debug, error, info};
use serde::de::DeserializeOwned;
use serde::Deserialize;
@ -273,17 +273,19 @@ impl Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub(crate) struct HttpApi<W, Q> {
pub(crate) struct HttpApi<W, Q, T> {
common_state: CommonServerState,
write_buffer: Arc<W>,
time_provider: Arc<T>,
pub(crate) query_executor: Arc<Q>,
max_request_bytes: usize,
authorizer: Arc<dyn Authorizer>,
}
impl<W, Q> HttpApi<W, Q> {
impl<W, Q, T> HttpApi<W, Q, T> {
pub(crate) fn new(
common_state: CommonServerState,
time_provider: Arc<T>,
write_buffer: Arc<W>,
query_executor: Arc<Q>,
max_request_bytes: usize,
@ -291,6 +293,7 @@ impl<W, Q> HttpApi<W, Q> {
) -> Self {
Self {
common_state,
time_provider,
write_buffer,
query_executor,
max_request_bytes,
@ -299,10 +302,11 @@ impl<W, Q> HttpApi<W, Q> {
}
}
impl<W, Q> HttpApi<W, Q>
impl<W, Q, T> HttpApi<W, Q, T>
where
W: WriteBuffer,
Q: QueryExecutor,
T: TimeProvider,
Error: From<<Q as QueryExecutor>::Error>,
{
async fn write_lp(&self, req: Request<Body>) -> Result<Response<Body>> {
@ -316,8 +320,7 @@ where
let database = NamespaceName::new(params.db)?;
// TODO: use the time provider
let default_time = SystemProvider::new().now().timestamp_nanos();
let default_time = self.time_provider.now();
let result = self
.write_buffer
@ -704,8 +707,8 @@ pub(crate) struct WriteParams {
pub(crate) precision: Precision,
}
pub(crate) async fn route_request<W, Q>(
http_server: Arc<HttpApi<W, Q>>,
pub(crate) async fn route_request<W: WriteBuffer, Q: QueryExecutor, T: TimeProvider>(
http_server: Arc<HttpApi<W, Q, T>>,
mut req: Request<Body>,
) -> Result<Response<Body>, Infallible>
where

View File

@ -27,6 +27,7 @@ use datafusion::execution::SendableRecordBatchStream;
use hyper::service::service_fn;
use influxdb3_write::{Persister, WriteBuffer};
use iox_query::QueryNamespaceProvider;
use iox_time::TimeProvider;
use observability_deps::tracing::{error, info};
use service::hybrid;
use std::convert::Infallible;
@ -113,9 +114,9 @@ impl CommonServerState {
#[allow(dead_code)]
#[derive(Debug)]
pub struct Server<W, Q, P> {
pub struct Server<W, Q, P, T> {
common_state: CommonServerState,
http: Arc<HttpApi<W, Q>>,
http: Arc<HttpApi<W, Q, T>>,
persister: Arc<P>,
authorizer: Arc<dyn Authorizer>,
}
@ -147,26 +148,23 @@ pub enum QueryKind {
Sql,
InfluxQl,
}
impl<W, Q, P> Server<W, Q, P> {
impl<W, Q, P, T> Server<W, Q, P, T> {
pub fn authorizer(&self) -> Arc<dyn Authorizer> {
Arc::clone(&self.authorizer)
}
}
pub async fn serve<W, Q, P>(server: Server<W, Q, P>, shutdown: CancellationToken) -> Result<()>
pub async fn serve<W, Q, P, T>(
server: Server<W, Q, P, T>,
shutdown: CancellationToken,
) -> Result<()>
where
W: WriteBuffer,
Q: QueryExecutor,
http::Error: From<<Q as QueryExecutor>::Error>,
P: Persister,
T: TimeProvider,
{
// TODO:
// 1. load the persisted catalog and segments from the persister
// 2. load semgments into the buffer
// 3. persist any segments from the buffer that are closed and haven't yet been persisted
// 4. start serving
let req_metrics = RequestMetrics::new(
Arc::clone(&server.common_state.metrics),
MetricFamily::HttpServer,
@ -230,7 +228,9 @@ mod tests {
use datafusion::parquet::data_type::AsBytes;
use hyper::{body, Body, Client, Request, Response, StatusCode};
use influxdb3_write::persister::PersisterImpl;
use influxdb3_write::SegmentDuration;
use iox_query::exec::{Executor, ExecutorConfig};
use iox_time::{MockProvider, Time};
use object_store::DynObjectStore;
use parquet_file::storage::{ParquetStorage, StorageId};
use pretty_assertions::assert_eq;
@ -266,11 +266,14 @@ mod tests {
mem_pool_size: usize::MAX,
}));
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let write_buffer = Arc::new(
influxdb3_write::write_buffer::WriteBufferImpl::new(
Arc::clone(&persister),
None::<Arc<influxdb3_write::wal::WalImpl>>,
Arc::clone(&time_provider),
SegmentDuration::new_5m(),
)
.await
.unwrap(),
@ -289,6 +292,7 @@ mod tests {
.query_executor(Arc::clone(&query_executor))
.persister(Arc::clone(&persister))
.authorizer(Arc::new(DefaultAuthorizer))
.time_provider(Arc::clone(&time_provider))
.build();
let frontend_shutdown = CancellationToken::new();
let shutdown = frontend_shutdown.clone();
@ -398,11 +402,14 @@ mod tests {
mem_pool_size: usize::MAX,
}));
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let write_buffer = Arc::new(
influxdb3_write::write_buffer::WriteBufferImpl::new(
Arc::clone(&persister),
None::<Arc<influxdb3_write::wal::WalImpl>>,
Arc::clone(&time_provider),
SegmentDuration::new_5m(),
)
.await
.unwrap(),
@ -421,6 +428,7 @@ mod tests {
.query_executor(Arc::new(query_executor))
.persister(persister)
.authorizer(Arc::new(DefaultAuthorizer))
.time_provider(Arc::clone(&time_provider))
.build();
let frontend_shutdown = CancellationToken::new();
let shutdown = frontend_shutdown.clone();
@ -566,11 +574,16 @@ mod tests {
mem_pool_size: usize::MAX,
}));
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(
1708473607000000000,
)));
let write_buffer = Arc::new(
influxdb3_write::write_buffer::WriteBufferImpl::new(
Arc::clone(&persister),
None::<Arc<influxdb3_write::wal::WalImpl>>,
Arc::clone(&time_provider),
SegmentDuration::new_5m(),
)
.await
.unwrap(),
@ -589,6 +602,7 @@ mod tests {
.query_executor(Arc::new(query_executor))
.persister(persister)
.authorizer(Arc::new(DefaultAuthorizer))
.time_provider(Arc::clone(&time_provider))
.build();
let frontend_shutdown = CancellationToken::new();
let shutdown = frontend_shutdown.clone();

View File

@ -12,6 +12,7 @@ datafusion_util.workspace = true
influxdb-line-protocol.workspace = true
iox_catalog.workspace = true
iox_query.workspace = true
iox_time.workspace = true
observability_deps.workspace = true
schema.workspace = true

View File

@ -13,7 +13,7 @@ pub mod wal;
pub mod write_buffer;
use crate::catalog::Catalog;
use crate::paths::ParquetFilePath;
use crate::paths::{ParquetFilePath, SegmentWalFilePath};
use async_trait::async_trait;
use bytes::Bytes;
use data_types::NamespaceName;
@ -22,13 +22,17 @@ use datafusion::execution::context::SessionState;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::prelude::Expr;
use iox_query::QueryChunk;
use iox_time::Time;
use parquet::format::FileMetaData;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Add;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
#[derive(Debug, Error)]
@ -44,6 +48,9 @@ pub enum Error {
#[error("persister error: {0}")]
Persister(#[from] persister::Error),
#[error("invalid segment duration {0}. Must be one of 1m, 5m, 10m, 15m, 30m, 1h, 2h, 4h")]
InvalidSegmentDuration(String),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -67,7 +74,7 @@ pub trait Bufferer: Debug + Send + Sync + 'static {
&self,
database: NamespaceName<'static>,
lp: &str,
default_time: i64,
ingest_time: Time,
accept_partial: bool,
precision: Precision,
) -> write_buffer::Result<BufferedWriteRequest>;
@ -126,21 +133,155 @@ pub trait ChunkContainer: Debug + Send + Sync + 'static {
Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
)]
pub struct SegmentId(u32);
pub type SegmentIdBytes = [u8; 4];
const RANGE_KEY_TIME_FORMAT: &str = "%Y-%m-%dT%H-%M";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SegmentRange {
/// inclusive of this start time
pub start_time: Time,
/// exclusive of this end time
pub end_time: Time,
/// data in segment is outside this range of time
pub contains_data_outside_range: bool,
}
/// The duration of a segment.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SegmentDuration(Duration);
impl SegmentDuration {
pub fn duration_seconds(&self) -> i64 {
self.0.as_secs() as i64
}
/// Given a time, returns the start time of the segment that contains the time.
pub fn start_time(&self, timestamp_seconds: i64) -> Time {
let duration_seconds = self.duration_seconds();
let rounded_seconds = (timestamp_seconds / duration_seconds) * duration_seconds;
Time::from_timestamp(rounded_seconds, 0).unwrap()
}
pub fn as_duration(&self) -> Duration {
self.0
}
/// Returns the segment duration from a given `SegmentRange`
pub fn from_range(segment_range: SegmentRange) -> Self {
let duration = segment_range.duration();
Self(duration)
}
pub fn new_5m() -> Self {
Self(Duration::from_secs(300))
}
}
impl FromStr for SegmentDuration {
type Err = Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s {
"1m" => Ok(Self(Duration::from_secs(60))),
"5m" => Ok(Self(Duration::from_secs(300))),
"10m" => Ok(Self(Duration::from_secs(600))),
"15m" => Ok(Self(Duration::from_secs(900))),
"30m" => Ok(Self(Duration::from_secs(1800))),
"1h" => Ok(Self(Duration::from_secs(3600))),
"2h" => Ok(Self(Duration::from_secs(7200))),
"4h" => Ok(Self(Duration::from_secs(14400))),
_ => Err(Error::InvalidSegmentDuration(s.to_string())),
}
}
}
impl SegmentRange {
/// Given the time will find the appropriate start and end time for the given duration.
pub fn from_time_and_duration(
clock_time: Time,
segment_duration: SegmentDuration,
data_outside: bool,
) -> Self {
let start_time = segment_duration.start_time(clock_time.timestamp());
Self {
start_time,
end_time: start_time.add(segment_duration.as_duration()),
contains_data_outside_range: data_outside,
}
}
/// Returns the string key for the segment range
pub fn key(&self) -> String {
format!(
"{}",
self.start_time.date_time().format(RANGE_KEY_TIME_FORMAT)
)
}
/// Returns a segment range for the next block of time based on the range of this segment.
pub fn next(&self) -> Self {
Self {
start_time: self.end_time,
end_time: self.end_time.add(self.duration()),
contains_data_outside_range: self.contains_data_outside_range,
}
}
pub fn duration(&self) -> Duration {
self.end_time
.checked_duration_since(self.start_time)
.unwrap()
}
#[cfg(test)]
pub fn test_range() -> Self {
Self {
start_time: Time::from_rfc3339("1970-01-01T00:00:00Z").unwrap(),
end_time: Time::from_rfc3339("1970-01-01T00:05:00Z").unwrap(),
contains_data_outside_range: false,
}
}
}
impl Serialize for SegmentRange {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
let start_time = self.start_time.timestamp();
let end_time = self.end_time.timestamp();
let times = (start_time, end_time, self.contains_data_outside_range);
times.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for SegmentRange {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let (start_time, end_time, contains_data_outside_range) =
<(i64, i64, bool)>::deserialize(deserializer)?;
let start_time = Time::from_timestamp(start_time, 0)
.ok_or_else(|| serde::de::Error::custom("start_time is not a valid timestamp"))?;
let end_time = Time::from_timestamp(end_time, 0)
.ok_or_else(|| serde::de::Error::custom("end_time is not a valid timestamp"))?;
Ok(SegmentRange {
start_time,
end_time,
contains_data_outside_range,
})
}
}
impl SegmentId {
pub fn new(id: u32) -> Self {
Self(id)
}
pub fn as_bytes(&self) -> SegmentIdBytes {
self.0.to_be_bytes()
}
pub fn from_bytes(bytes: SegmentIdBytes) -> Self {
Self(u32::from_be_bytes(bytes))
}
pub fn next(&self) -> Self {
Self(self.0 + 1)
}
@ -206,7 +347,17 @@ pub trait Persister: Debug + Send + Sync + 'static {
}
pub trait Wal: Debug + Send + Sync + 'static {
/// Opens a writer to a segment, either creating a new file or appending to an existing file.
/// Opens a writer to a new segment with the given id, start time (inclusive), and end_time
/// (exclusive). data_outside_range specifies if data in the segment has data that is in the
/// provided range or outside it.
fn new_segment_writer(
&self,
segment_id: SegmentId,
range: SegmentRange,
) -> wal::Result<Box<dyn WalSegmentWriter>>;
/// Opens a writer to an existing segment, reading its last sequence number and making it
/// ready to append new writes.
fn open_segment_writer(&self, segment_id: SegmentId) -> wal::Result<Box<dyn WalSegmentWriter>>;
/// Opens a reader to a segment file.
@ -232,15 +383,17 @@ pub trait WalSegmentWriter: Debug + Send + Sync + 'static {
fn bytes_written(&self) -> u64;
fn write_batch(&mut self, ops: Vec<WalOp>) -> wal::Result<SequenceNumber>;
fn write_batch(&mut self, ops: Vec<WalOp>) -> wal::Result<()>;
fn last_sequence_number(&self) -> SequenceNumber;
}
pub trait WalSegmentReader: Debug + Send + Sync + 'static {
fn id(&self) -> SegmentId;
fn next_batch(&mut self) -> wal::Result<Option<WalOpBatch>>;
fn header(&self) -> &wal::SegmentHeader;
fn path(&self) -> &SegmentWalFilePath;
}
/// Individual WalOps get batched into the WAL asynchronously. The batch is then written to the segment file.
@ -286,9 +439,6 @@ pub struct BufferedWriteRequest {
pub line_count: usize,
pub field_count: usize,
pub tag_count: usize,
pub total_buffer_memory_used: usize,
pub segment_id: SegmentId,
pub sequence_number: SequenceNumber,
}
/// A persisted Catalog that contains the database, table, and column schemas.
@ -398,9 +548,10 @@ pub(crate) fn guess_precision(timestamp: i64) -> Precision {
mod test_helpers {
use crate::catalog::{Catalog, DatabaseSchema};
use crate::write_buffer::buffer_segment::WriteBatch;
use crate::write_buffer::{parse_validate_and_update_schema, Partitioner, TableBatch};
use crate::Precision;
use crate::write_buffer::{parse_validate_and_update_schema, TableBatch};
use crate::{Precision, SegmentDuration};
use data_types::NamespaceName;
use iox_time::Time;
use std::collections::HashMap;
use std::sync::Arc;
@ -409,14 +560,15 @@ mod test_helpers {
db_name: &'static str,
lp: &str,
) -> WriteBatch {
let db_name = NamespaceName::new(db_name).unwrap();
let mut write_batch = WriteBatch::default();
let (seq, db) = catalog.db_or_create(db_name).unwrap();
let partitioner = Partitioner::new_per_day_partitioner();
let result = parse_validate_and_update_schema(
let (seq, db) = catalog.db_or_create(db_name.as_str()).unwrap();
let mut result = parse_validate_and_update_schema(
lp,
&db,
&partitioner,
0,
db_name.clone(),
Time::from_timestamp_nanos(0),
SegmentDuration::new_5m(),
false,
Precision::Nanosecond,
)
@ -424,24 +576,98 @@ mod test_helpers {
if let Some(db) = result.schema {
catalog.replace_database(seq, Arc::new(db)).unwrap();
}
let db_name = NamespaceName::new(db_name).unwrap();
write_batch.add_db_write(db_name, result.table_batches);
write_batch.add_db_write(
db_name,
result.valid_segmented_data.pop().unwrap().table_batches,
);
write_batch
}
pub(crate) fn lp_to_table_batches(lp: &str, default_time: i64) -> HashMap<String, TableBatch> {
let db = Arc::new(DatabaseSchema::new("foo"));
let partitioner = Partitioner::new_per_day_partitioner();
let result = parse_validate_and_update_schema(
let db_name = NamespaceName::new("foo").unwrap();
let mut result = parse_validate_and_update_schema(
lp,
&db,
&partitioner,
default_time,
db_name,
Time::from_timestamp_nanos(default_time),
SegmentDuration::new_5m(),
false,
Precision::Nanosecond,
)
.unwrap();
result.table_batches
result.valid_segmented_data.pop().unwrap().table_batches
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn segment_range_initialization() {
let t = Time::from_rfc3339("2024-03-01T13:46:00Z").unwrap();
let expected = SegmentRange {
start_time: Time::from_rfc3339("2024-03-01T13:45:00Z").unwrap(),
end_time: Time::from_rfc3339("2024-03-01T14:00:00Z").unwrap(),
contains_data_outside_range: false,
};
let actual = SegmentRange::from_time_and_duration(
t,
SegmentDuration::from_str("15m").unwrap(),
false,
);
assert_eq!(expected, actual);
let expected = SegmentRange {
start_time: Time::from_rfc3339("2024-03-01T13:00:00Z").unwrap(),
end_time: Time::from_rfc3339("2024-03-01T14:00:00Z").unwrap(),
contains_data_outside_range: false,
};
let actual = SegmentRange::from_time_and_duration(
t,
SegmentDuration::from_str("1h").unwrap(),
false,
);
assert_eq!(expected, actual);
let expected = SegmentRange {
start_time: Time::from_rfc3339("2024-03-01T14:00:00Z").unwrap(),
end_time: Time::from_rfc3339("2024-03-01T15:00:00Z").unwrap(),
contains_data_outside_range: false,
};
assert_eq!(expected, actual.next());
let expected = SegmentRange {
start_time: Time::from_rfc3339("2024-03-01T12:00:00Z").unwrap(),
end_time: Time::from_rfc3339("2024-03-01T14:00:00Z").unwrap(),
contains_data_outside_range: false,
};
let actual = SegmentRange::from_time_and_duration(
t,
SegmentDuration::from_str("2h").unwrap(),
false,
);
assert_eq!(expected, actual);
let expected = SegmentRange {
start_time: Time::from_rfc3339("2024-03-01T12:00:00Z").unwrap(),
end_time: Time::from_rfc3339("2024-03-01T16:00:00Z").unwrap(),
contains_data_outside_range: false,
};
let actual = SegmentRange::from_time_and_duration(
t,
SegmentDuration::from_str("4h").unwrap(),
false,
);
assert_eq!(expected, actual);
}
}

View File

@ -110,6 +110,12 @@ impl SegmentWalFilePath {
}
}
impl ToString for SegmentWalFilePath {
fn to_string(&self) -> String {
self.0.to_string_lossy().into_owned()
}
}
impl Deref for SegmentWalFilePath {
type Target = Path;

View File

@ -3,14 +3,16 @@
use crate::paths::SegmentWalFilePath;
use crate::{
SegmentFile, SegmentId, SegmentIdBytes, SequenceNumber, Wal, WalOp, WalOpBatch,
WalSegmentReader, WalSegmentWriter,
SegmentFile, SegmentId, SegmentRange, SequenceNumber, Wal, WalOp, WalOpBatch, WalSegmentReader,
WalSegmentWriter,
};
use async_trait::async_trait;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use crc32fast::Hasher;
use datafusion::parquet::file::reader::Length;
use iox_time::Time;
use observability_deps::tracing::{info, warn};
use serde::{Deserialize, Serialize};
use snap::read::FrameDecoder;
use std::fmt::Debug;
use std::{
@ -44,12 +46,8 @@ pub enum Error {
#[from]
source: std::num::TryFromIntError,
},
#[error("invalid segment file {segment_id:?} at {path:?}: {reason}")]
InvalidSegmentFile {
segment_id: SegmentId,
path: SegmentWalFilePath,
reason: String,
},
#[error("invalid segment file {path:?}: {reason}")]
InvalidSegmentFile { path: PathBuf, reason: String },
#[error("unable to read crc for segment {segment_id:?}")]
UnableToReadCrc { segment_id: SegmentId },
@ -79,6 +77,18 @@ pub enum Error {
file_name: String,
source: std::num::ParseIntError,
},
#[error("file exists: {0}")]
FileExists(PathBuf),
#[error("file doens't exist: {0}")]
FileDoesntExist(PathBuf),
#[error("segment start time not open: {0}")]
SegmentStartTimeNotOpen(Time),
#[error("open segment limit reached: {0}")]
OpenSegmentLimitReached(usize),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -104,11 +114,6 @@ impl WalImpl {
Ok(Self { root })
}
fn open_segment_writer(&self, segment_id: SegmentId) -> Result<Box<dyn WalSegmentWriter>> {
let writer = WalSegmentWriterImpl::new_or_open(self.root.clone(), segment_id)?;
Ok(Box::new(writer))
}
fn open_segment_reader(&self, segment_id: SegmentId) -> Result<Box<dyn WalSegmentReader>> {
let reader = WalSegmentReaderImpl::new(self.root.clone(), segment_id)?;
Ok(Box::new(reader))
@ -166,8 +171,18 @@ impl WalImpl {
}
impl Wal for WalImpl {
fn new_segment_writer(
&self,
segment_id: SegmentId,
range: SegmentRange,
) -> Result<Box<dyn WalSegmentWriter>> {
let writer = WalSegmentWriterImpl::new(self.root.clone(), segment_id, range)?;
Ok(Box::new(writer))
}
fn open_segment_writer(&self, segment_id: SegmentId) -> Result<Box<dyn WalSegmentWriter>> {
self.open_segment_writer(segment_id)
let writer = WalSegmentWriterImpl::open(self.root.clone(), segment_id)?;
Ok(Box::new(writer))
}
fn open_segment_reader(&self, segment_id: SegmentId) -> Result<Box<dyn WalSegmentReader>> {
@ -183,6 +198,12 @@ impl Wal for WalImpl {
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct SegmentHeader {
pub id: SegmentId,
pub range: SegmentRange,
}
#[derive(Debug)]
pub struct WalSegmentWriterImpl {
segment_id: SegmentId,
@ -194,33 +215,12 @@ pub struct WalSegmentWriterImpl {
}
impl WalSegmentWriterImpl {
pub fn new_or_open(root: PathBuf, segment_id: SegmentId) -> Result<Self> {
pub fn new(root: PathBuf, segment_id: SegmentId, range: SegmentRange) -> Result<Self> {
let path = SegmentWalFilePath::new(root, segment_id);
// if there's already a file there, validate its header and pull the sequence number from the last entry
// if there's already a file there, error out
if path.exists() {
if let Some(file_info) =
WalSegmentReaderImpl::read_segment_file_info_if_exists(path.clone(), segment_id)?
{
let f = OpenOptions::new().write(true).append(true).open(&path)?;
return Ok(Self {
segment_id,
f,
bytes_written: file_info
.bytes_written
.try_into()
.expect("file length must fit in usize"),
sequence_number: file_info.last_sequence_number,
buffer: Vec::with_capacity(8 * 1204), // 8kiB initial size
});
} else {
return Err(Error::InvalidSegmentFile {
segment_id,
path,
reason: "file exists but is invalid".to_string(),
});
}
return Err(Error::FileExists(path.to_path_buf()));
}
// it's a new file, initialize it with the header and get ready to start writing
@ -229,13 +229,23 @@ impl WalSegmentWriterImpl {
f.write_all(FILE_TYPE_IDENTIFIER)?;
let file_type_bytes_written = FILE_TYPE_IDENTIFIER.len();
let id_bytes = segment_id.as_bytes();
f.write_all(&id_bytes)?;
let id_bytes_written = id_bytes.len();
let header = SegmentHeader {
id: segment_id,
range,
};
let header_bytes = serde_json::to_vec(&header)?;
f.write_u16::<BigEndian>(
header_bytes
.len()
.try_into()
.expect("header byes longer than u16"),
)?;
f.write_all(&header_bytes)?;
f.sync_all().expect("fsync failure");
let bytes_written = file_type_bytes_written + id_bytes_written;
let bytes_written = file_type_bytes_written + header_bytes.len();
Ok(Self {
segment_id,
@ -245,9 +255,30 @@ impl WalSegmentWriterImpl {
buffer: Vec::with_capacity(8 * 1204), // 8kiB initial size
})
}
pub fn open(root: PathBuf, segment_id: SegmentId) -> Result<Self> {
let path = SegmentWalFilePath::new(root, segment_id);
fn write_batch(&mut self, ops: Vec<WalOp>) -> Result<SequenceNumber> {
println!("write batch in impl");
if let Some(file_info) =
WalSegmentReaderImpl::read_segment_file_info_if_exists(path.clone())?
{
let f = OpenOptions::new().write(true).append(true).open(&path)?;
Ok(Self {
segment_id,
f,
bytes_written: file_info
.bytes_written
.try_into()
.expect("file length must fit in usize"),
sequence_number: file_info.last_sequence_number,
buffer: Vec::with_capacity(8 * 1204), // 8kiB initial size
})
} else {
Err(Error::FileDoesntExist(path.to_path_buf()))
}
}
fn write_batch(&mut self, ops: Vec<WalOp>) -> Result<()> {
// Ensure the write buffer is always empty before using it.
self.buffer.clear();
@ -265,7 +296,7 @@ impl WalSegmentWriterImpl {
self.bytes_written += bytes_written;
self.sequence_number = sequence_number;
Ok(self.sequence_number)
Ok(())
}
fn write_bytes(&mut self, data: Vec<u8>) -> Result<usize> {
@ -322,7 +353,7 @@ impl WalSegmentWriter for WalSegmentWriterImpl {
self.bytes_written as u64
}
fn write_batch(&mut self, ops: Vec<WalOp>) -> Result<SequenceNumber> {
fn write_batch(&mut self, ops: Vec<WalOp>) -> Result<()> {
self.write_batch(ops)
}
@ -357,15 +388,11 @@ impl WalSegmentWriter for WalSegmentWriterNoopImpl {
self.wal_ops_written as u64
}
fn write_batch(&mut self, ops: Vec<WalOp>) -> Result<SequenceNumber> {
fn write_batch(&mut self, ops: Vec<WalOp>) -> Result<()> {
let sequence_number = self.sequence_number.next();
self.sequence_number = sequence_number;
self.wal_ops_written += ops.len();
println!(
"write_batch called: wal_ops_written: {}",
self.wal_ops_written
);
Ok(sequence_number)
Ok(())
}
fn last_sequence_number(&self) -> SequenceNumber {
@ -376,47 +403,38 @@ impl WalSegmentWriter for WalSegmentWriterNoopImpl {
#[derive(Debug)]
pub struct WalSegmentReaderImpl {
f: BufReader<File>,
segment_id: SegmentId,
path: SegmentWalFilePath,
segment_header: SegmentHeader,
}
impl WalSegmentReaderImpl {
pub fn new(root: impl Into<PathBuf>, segment_id: SegmentId) -> Result<Self> {
let path = SegmentWalFilePath::new(root, segment_id);
let f = BufReader::new(File::open(path.clone())?);
let mut f = BufReader::new(File::open(path.clone())?);
let mut reader = Self { f, segment_id };
let segment_header = read_header(&path, &mut f)?;
let (file_type, id) = reader.read_header()?;
if file_type != FILE_TYPE_IDENTIFIER {
if segment_id != segment_header.id {
return Err(Error::InvalidSegmentFile {
segment_id,
path,
reason: format!(
"expected file type identifier {:?}, got {:?}",
FILE_TYPE_IDENTIFIER, file_type
),
});
}
if id != segment_id.as_bytes() {
return Err(Error::InvalidSegmentFile {
segment_id,
path,
path: path.to_path_buf(),
reason: format!(
"expected segment id {:?} in file, got {:?}",
segment_id.as_bytes(),
id
segment_id, segment_header.id,
),
});
}
let reader = Self {
f,
path,
segment_header,
};
Ok(reader)
}
fn read_segment_file_info_if_exists(
path: SegmentWalFilePath,
segment_id: SegmentId,
) -> Result<Option<ExistingSegmentFileInfo>> {
let f = match File::open(path.clone()) {
Ok(f) => f,
@ -426,24 +444,15 @@ impl WalSegmentReaderImpl {
let bytes_written = f.len().try_into()?;
let mut f = BufReader::new(f);
let segment_header = read_header(&path, &mut f)?;
let mut reader = Self {
f: BufReader::new(f),
segment_id,
f,
path,
segment_header,
};
let (file_type, _id) = reader.read_header()?;
if file_type != FILE_TYPE_IDENTIFIER {
return Err(Error::InvalidSegmentFile {
segment_id,
path,
reason: format!(
"expected file type identifier {:?}, got {:?}",
FILE_TYPE_IDENTIFIER, file_type
),
});
}
let mut last_block = None;
while let Some(block) = reader.next_segment_block()? {
@ -458,7 +467,10 @@ impl WalSegmentReaderImpl {
bytes_written,
}))
} else {
Ok(None)
Ok(Some(ExistingSegmentFileInfo {
last_sequence_number: SequenceNumber::new(0),
bytes_written,
}))
}
}
@ -493,7 +505,7 @@ impl WalSegmentReaderImpl {
if expected_len != actual_compressed_len {
return Err(Error::LengthMismatch {
segment_id: self.segment_id,
segment_id: self.segment_header.id,
expected: expected_len,
actual: actual_compressed_len,
});
@ -501,7 +513,7 @@ impl WalSegmentReaderImpl {
if expected_checksum != actual_checksum {
return Err(Error::ChecksumMismatch {
segment_id: self.segment_id,
segment_id: self.segment_header.id,
expected: expected_checksum,
actual: actual_checksum,
});
@ -509,16 +521,33 @@ impl WalSegmentReaderImpl {
Ok(Some(data))
}
}
fn read_array<const N: usize>(&mut self) -> Result<[u8; N]> {
let mut data = [0u8; N];
self.f.read_exact(&mut data)?;
Ok(data)
fn read_header(path: &SegmentWalFilePath, f: &mut BufReader<File>) -> Result<SegmentHeader> {
let file_type: FileTypeIdentifier = read_array(f)?;
if file_type != FILE_TYPE_IDENTIFIER {
return Err(Error::InvalidSegmentFile {
path: path.to_path_buf(),
reason: format!(
"expected file type identifier {:?}, got {:?}",
FILE_TYPE_IDENTIFIER, file_type
),
});
}
fn read_header(&mut self) -> Result<(FileTypeIdentifier, SegmentIdBytes)> {
Ok((self.read_array()?, self.read_array()?))
}
let len = f.read_u16::<BigEndian>()?;
let mut data = vec![0u8; len.into()];
f.read_exact(&mut data)?;
let header: SegmentHeader = serde_json::from_slice(&data)?;
Ok(header)
}
fn read_array<const N: usize>(f: &mut BufReader<File>) -> Result<[u8; N]> {
let mut data = [0u8; N];
f.read_exact(&mut data)?;
Ok(data)
}
struct ExistingSegmentFileInfo {
@ -527,13 +556,17 @@ struct ExistingSegmentFileInfo {
}
impl WalSegmentReader for WalSegmentReaderImpl {
fn id(&self) -> SegmentId {
self.segment_id
}
fn next_batch(&mut self) -> Result<Option<WalOpBatch>> {
self.next_batch()
}
fn header(&self) -> &SegmentHeader {
&self.segment_header
}
fn path(&self) -> &SegmentWalFilePath {
&self.path
}
}
struct CrcReader<R> {
@ -624,7 +657,9 @@ mod tests {
fn segment_writer_reader() {
let dir = test_helpers::tmp_dir().unwrap().into_path();
let mut writer = WalSegmentWriterImpl::new_or_open(dir.clone(), SegmentId::new(0)).unwrap();
let mut writer =
WalSegmentWriterImpl::new(dir.clone(), SegmentId::new(0), SegmentRange::test_range())
.unwrap();
let wal_op = WalOp::LpWrite(LpWriteOp {
db_name: "foo".to_string(),
lp: "cpu host=a val=10i 10".to_string(),
@ -650,15 +685,18 @@ mod tests {
// open the file, write and close it
{
let mut writer =
WalSegmentWriterImpl::new_or_open(dir.clone(), SegmentId::new(0)).unwrap();
let mut writer = WalSegmentWriterImpl::new(
dir.clone(),
SegmentId::new(0),
SegmentRange::test_range(),
)
.unwrap();
writer.write_batch(vec![wal_op.clone()]).unwrap();
}
// open it again, send a new write in and close it
{
let mut writer =
WalSegmentWriterImpl::new_or_open(dir.clone(), SegmentId::new(0)).unwrap();
let mut writer = WalSegmentWriterImpl::open(dir.clone(), SegmentId::new(0)).unwrap();
writer.write_batch(vec![wal_op.clone()]).unwrap();
}
@ -685,10 +723,14 @@ mod tests {
});
let wal = WalImpl::new(dir.clone()).unwrap();
let mut writer = wal.open_segment_writer(SegmentId::new(0)).unwrap();
let mut writer = wal
.new_segment_writer(SegmentId::new(0), SegmentRange::test_range())
.unwrap();
writer.write_batch(vec![wal_op.clone()]).unwrap();
let mut writer2 = wal.open_segment_writer(SegmentId::new(1)).unwrap();
let mut writer2 = wal
.new_segment_writer(SegmentId::new(1), SegmentRange::test_range())
.unwrap();
writer2.write_batch(vec![wal_op.clone()]).unwrap();
let segments = wal.segment_files().unwrap();

View File

@ -5,10 +5,13 @@
use crate::catalog::Catalog;
use crate::paths::ParquetFilePath;
use crate::write_buffer::flusher::BufferedWriteResult;
use crate::write_buffer::{parse_validate_and_update_catalog, FieldData, Row, TableBatch};
use crate::write_buffer::{
parse_validate_and_update_catalog, Error, FieldData, Row, TableBatch, ValidSegmentedData,
};
use crate::{
wal, write_buffer::Result, DatabaseTables, ParquetFile, PersistedSegment, Persister, Precision,
SegmentId, SequenceNumber, TableParquetFiles, WalOp, WalSegmentReader, WalSegmentWriter,
SegmentDuration, SegmentId, SegmentRange, SequenceNumber, TableParquetFiles, WalOp,
WalSegmentReader, WalSegmentWriter,
};
use arrow::array::{
ArrayRef, BooleanBuilder, Float64Builder, Int64Builder, StringBuilder, StringDictionaryBuilder,
@ -16,8 +19,9 @@ use arrow::array::{
};
use arrow::datatypes::Int32Type;
use arrow::record_batch::RecordBatch;
use data_types::{ColumnType, NamespaceName, TimestampMinMax};
use data_types::{ColumnType, NamespaceName, PartitionKey, TimestampMinMax};
use datafusion_util::stream_from_batch;
use iox_time::Time;
use schema::Schema;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
@ -27,6 +31,8 @@ use tokio::sync::oneshot;
pub struct OpenBufferSegment {
segment_writer: Box<dyn WalSegmentWriter>,
segment_id: SegmentId,
segment_range: SegmentRange,
segment_key: PartitionKey,
buffered_data: BufferedData,
#[allow(dead_code)]
starting_catalog_sequence_number: SequenceNumber,
@ -38,25 +44,38 @@ pub struct OpenBufferSegment {
impl OpenBufferSegment {
pub fn new(
segment_id: SegmentId,
segment_range: SegmentRange,
starting_catalog_sequence_number: SequenceNumber,
segment_writer: Box<dyn WalSegmentWriter>,
buffered_data: Option<(BufferedData, usize)>,
) -> Self {
let (buffered_data, segment_size) = buffered_data.unwrap_or_default();
let segment_key = PartitionKey::from(segment_range.key());
Self {
segment_writer,
segment_id,
segment_range,
segment_key,
starting_catalog_sequence_number,
segment_size,
buffered_data,
}
}
#[allow(dead_code)]
pub fn start_time_matches(&self, t: Time) -> bool {
self.segment_range.start_time == t
}
pub fn segment_id(&self) -> SegmentId {
self.segment_id
}
pub fn write_batch(&mut self, write_batch: Vec<WalOp>) -> wal::Result<SequenceNumber> {
pub fn segment_range(&self) -> &SegmentRange {
&self.segment_range
}
pub fn write_wal_ops(&mut self, write_batch: Vec<WalOp>) -> wal::Result<()> {
self.segment_writer.write_batch(write_batch)
}
@ -67,8 +86,8 @@ impl OpenBufferSegment {
.and_then(|db_buffer| db_buffer.table_buffers.get(table_name).cloned())
}
/// Adds the batch into the in memory buffer. Returns the number of rows in the segment after the write.
pub(crate) fn buffer_writes(&mut self, write_batch: WriteBatch) -> Result<usize> {
/// Adds the batch into the in memory buffer.
pub(crate) fn buffer_writes(&mut self, write_batch: WriteBatch) -> Result<()> {
for (db_name, db_batch) in write_batch.database_batches {
let db_buffer = self
.buffered_data
@ -77,30 +96,31 @@ impl OpenBufferSegment {
.or_default();
for (table_name, table_batch) in db_batch.table_batches {
let table_buffer = db_buffer.table_buffers.entry(table_name).or_default();
for (partition_key, partition_batch) in table_batch.partition_batches {
let partition_buffer = table_buffer
.partition_buffers
.entry(partition_key)
.or_default();
// TODO: for now we'll just have the number of rows represent the segment size. The entire
// buffer is going to get refactored to use different structures, so this will change.
self.segment_size += partition_batch.rows.len();
partition_buffer.add_rows(partition_batch.rows);
}
let table_buffer = db_buffer
.table_buffers
.entry(table_name)
.or_insert_with(|| TableBuffer {
segment_key: self.segment_key.clone(),
rows: vec![],
timestamp_min: i64::MAX,
timestamp_max: i64::MIN,
});
// TODO: for now we'll just have the number of rows represent the segment size. The entire
// buffer is going to get refactored to use different structures, so this will change.
self.segment_size += table_batch.rows.len();
table_buffer.add_rows(table_batch.rows);
}
}
Ok(self.segment_size)
Ok(())
}
#[allow(dead_code)]
pub fn into_closed_segment(self, catalog: Arc<Catalog>) -> ClosedBufferSegment {
ClosedBufferSegment::new(
self.segment_id,
self.segment_range,
self.segment_key,
self.starting_catalog_sequence_number,
catalog.sequence_number(),
self.segment_writer,
@ -116,43 +136,54 @@ pub(crate) fn load_buffer_from_segment(
) -> Result<(BufferedData, usize)> {
let mut segment_size = 0;
let mut buffered_data = BufferedData::default();
let segment_key = PartitionKey::from(segment_reader.header().range.key());
let segment_duration = SegmentDuration::from_range(segment_reader.header().range);
while let Some(batch) = segment_reader.next_batch()? {
for wal_op in batch.ops {
println!("wal_op: {:?}", wal_op);
match wal_op {
WalOp::LpWrite(write) => {
let validated_write = parse_validate_and_update_catalog(
&write.db_name,
let mut validated_write = parse_validate_and_update_catalog(
NamespaceName::new(write.db_name.clone())?,
&write.lp,
catalog,
write.default_time,
Time::from_timestamp_nanos(write.default_time),
segment_duration,
false,
Precision::Nanosecond,
)?;
println!("validated_write: {:?}", validated_write);
let db_buffer = buffered_data
.database_buffers
.entry(write.db_name)
.or_default();
for (table_name, table_batch) in validated_write.table_batches {
let table_buffer = db_buffer.table_buffers.entry(table_name).or_default();
// there should only ever be data for a single segment as this is all read
// from one segment file
if validated_write.valid_segmented_data.len() != 1 {
return Err(Error::WalOpForMultipleSegments(
segment_reader.path().to_string(),
));
}
for (partition_key, partition_batch) in table_batch.partition_batches {
let partition_buffer = table_buffer
.partition_buffers
.entry(partition_key)
.or_default();
let segment_data = validated_write.valid_segmented_data.pop().unwrap();
// TODO: for now we'll just have the number of rows represent the segment size. The entire
// buffer is going to get refactored to use different structures, so this will change.
segment_size += partition_batch.rows.len();
for (table_name, table_batch) in segment_data.table_batches {
let table_buffer = db_buffer
.table_buffers
.entry(table_name)
.or_insert_with(|| TableBuffer {
segment_key: segment_key.clone(),
rows: vec![],
timestamp_min: i64::MAX,
timestamp_max: i64::MIN,
});
println!("partition_batch: {:?}", partition_batch);
// TODO: for now we'll just have the number of rows represent the segment size. The entire
// buffer is going to get refactored to use different structures, so this will change.
segment_size += table_batch.rows.len();
partition_buffer.add_rows(partition_batch.rows);
}
table_buffer.add_rows(table_batch.rows);
}
}
}
@ -187,48 +218,16 @@ impl DatabaseBatch {
fn add_table_batches(&mut self, table_batches: HashMap<String, TableBatch>) {
for (table_name, table_batch) in table_batches {
let write_table_batch = self.table_batches.entry(table_name).or_default();
for (partition_key, partition_batch) in table_batch.partition_batches {
let write_partition_batch = write_table_batch
.partition_batches
.entry(partition_key)
.or_default();
write_partition_batch.rows.extend(partition_batch.rows);
}
write_table_batch.rows.extend(table_batch.rows);
}
}
}
pub struct BufferedWrite {
pub wal_op: WalOp,
pub database_write: DatabaseWrite,
pub segmented_data: Vec<ValidSegmentedData>,
pub response_tx: oneshot::Sender<BufferedWriteResult>,
}
pub struct DatabaseWrite {
pub(crate) db_name: NamespaceName<'static>,
pub(crate) table_batches: HashMap<String, TableBatch>,
}
impl DatabaseWrite {
pub fn new(
db_name: NamespaceName<'static>,
table_batches: HashMap<String, TableBatch>,
) -> Self {
Self {
db_name,
table_batches,
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct WriteSummary {
pub segment_id: SegmentId,
pub sequence_number: SequenceNumber,
pub buffer_size: usize,
}
#[derive(Debug, Default, Eq, PartialEq)]
pub struct BufferedData {
database_buffers: HashMap<String, DatabaseBuffer>,
@ -239,36 +238,15 @@ struct DatabaseBuffer {
table_buffers: HashMap<String, TableBuffer>,
}
#[derive(Debug, Default, Clone, Eq, PartialEq)]
pub struct TableBuffer {
pub partition_buffers: HashMap<String, PartitionBuffer>,
}
impl TableBuffer {
#[allow(dead_code)]
pub fn partition_buffer(&self, partition_key: &str) -> Option<&PartitionBuffer> {
self.partition_buffers.get(partition_key)
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct PartitionBuffer {
pub struct TableBuffer {
pub segment_key: PartitionKey,
rows: Vec<Row>,
timestamp_min: i64,
timestamp_max: i64,
}
impl Default for PartitionBuffer {
fn default() -> Self {
Self {
rows: Vec::new(),
timestamp_min: i64::MAX,
timestamp_max: i64::MIN,
}
}
}
impl PartitionBuffer {
impl TableBuffer {
pub fn add_rows(&mut self, rows: Vec<Row>) {
self.rows.reserve(rows.len());
for row in rows {
@ -396,6 +374,8 @@ impl Builder {
#[derive(Debug)]
pub struct ClosedBufferSegment {
pub segment_id: SegmentId,
pub segment_range: SegmentRange,
pub segment_key: PartitionKey,
pub catalog_start_sequence_number: SequenceNumber,
pub catalog_end_sequence_number: SequenceNumber,
segment_writer: Box<dyn WalSegmentWriter>,
@ -405,8 +385,11 @@ pub struct ClosedBufferSegment {
impl ClosedBufferSegment {
#[allow(dead_code)]
#[allow(clippy::too_many_arguments)]
fn new(
segment_id: SegmentId,
segment_range: SegmentRange,
segment_key: PartitionKey,
catalog_start_sequence_number: SequenceNumber,
catalog_end_sequence_number: SequenceNumber,
segment_writer: Box<dyn WalSegmentWriter>,
@ -415,6 +398,8 @@ impl ClosedBufferSegment {
) -> Self {
Self {
segment_id,
segment_range,
segment_key,
catalog_start_sequence_number,
catalog_end_sequence_number,
segment_writer,
@ -456,37 +441,35 @@ impl ClosedBufferSegment {
sort_key: vec![],
};
// persist every partition buffer
for (partition_key, partition_buffer) in &table_buffer.partition_buffers {
let data = partition_buffer
.rows_to_record_batch(table.schema(), table.columns());
let row_count = data.num_rows();
let batch_stream = stream_from_batch(table.schema().as_arrow(), data);
let parquet_file_path = ParquetFilePath::new_with_parititon_key(
db_name,
&table.name,
partition_key,
self.segment_id.0,
);
let path = parquet_file_path.to_string();
let (size_bytes, meta) = persister
.persist_parquet_file(parquet_file_path, batch_stream)
.await?;
// persist every table buffer
let data =
table_buffer.rows_to_record_batch(table.schema(), table.columns());
let row_count = data.num_rows();
let batch_stream = stream_from_batch(table.schema().as_arrow(), data);
let parquet_file_path = ParquetFilePath::new_with_parititon_key(
db_name,
&table.name,
&table_buffer.segment_key.to_string(),
self.segment_id.0,
);
let path = parquet_file_path.to_string();
let (size_bytes, meta) = persister
.persist_parquet_file(parquet_file_path, batch_stream)
.await?;
let parquet_file = ParquetFile {
path,
size_bytes,
row_count: row_count as u64,
min_time: partition_buffer.timestamp_min,
max_time: partition_buffer.timestamp_max,
};
table_parquet_files.parquet_files.push(parquet_file);
let parquet_file = ParquetFile {
path,
size_bytes,
row_count: row_count as u64,
min_time: table_buffer.timestamp_min,
max_time: table_buffer.timestamp_max,
};
table_parquet_files.parquet_files.push(parquet_file);
segment_parquet_size_bytes += size_bytes;
segment_row_count += meta.num_rows as u64;
segment_max_time = segment_max_time.max(partition_buffer.timestamp_max);
segment_min_time = segment_min_time.min(partition_buffer.timestamp_min);
}
segment_parquet_size_bytes += size_bytes;
segment_row_count += meta.num_rows as u64;
segment_max_time = segment_max_time.max(table_buffer.timestamp_max);
segment_min_time = segment_min_time.min(table_buffer.timestamp_min);
if !table_parquet_files.parquet_files.is_empty() {
database_tables
@ -535,6 +518,7 @@ mod tests {
fn buffers_rows() {
let mut open_segment = OpenBufferSegment::new(
SegmentId::new(0),
SegmentRange::test_range(),
SequenceNumber::new(0),
Box::new(WalSegmentWriterNoopImpl::new(SegmentId::new(0))),
None,
@ -554,24 +538,29 @@ mod tests {
open_segment.buffer_writes(write_batch).unwrap();
let cpu_table = open_segment.table_buffer(&db_name, "cpu").unwrap();
let cpu_partition = cpu_table.partition_buffers.get("1970-01-01").unwrap();
assert_eq!(cpu_partition.rows.len(), 2);
assert_eq!(cpu_partition.timestamp_min, 10);
assert_eq!(cpu_partition.timestamp_max, 30);
assert_eq!(cpu_table.rows.len(), 2);
assert_eq!(cpu_table.timestamp_min, 10);
assert_eq!(cpu_table.timestamp_max, 30);
let mem_table = open_segment.table_buffer(&db_name, "mem").unwrap();
let mem_partition = mem_table.partition_buffers.get("1970-01-01").unwrap();
assert_eq!(mem_partition.rows.len(), 1);
assert_eq!(mem_partition.timestamp_min, 20);
assert_eq!(mem_partition.timestamp_max, 20);
assert_eq!(mem_table.rows.len(), 1);
assert_eq!(mem_table.timestamp_min, 20);
assert_eq!(mem_table.timestamp_max, 20);
}
#[tokio::test]
async fn persist_closed_buffer() {
const SEGMENT_KEY: &str = "1970-01-01T00-00";
let segment_id = SegmentId::new(4);
let segment_writer = Box::new(WalSegmentWriterNoopImpl::new(segment_id));
let mut open_segment =
OpenBufferSegment::new(segment_id, SequenceNumber::new(0), segment_writer, None);
let mut open_segment = OpenBufferSegment::new(
segment_id,
SegmentRange::test_range(),
SequenceNumber::new(0),
segment_writer,
None,
);
let catalog = Catalog::new();
@ -585,7 +574,7 @@ mod tests {
let write_batch = lp_to_write_batch(&catalog, "db1", lp);
open_segment.write_batch(vec![wal_op]).unwrap();
open_segment.write_wal_ops(vec![wal_op]).unwrap();
open_segment.buffer_writes(write_batch).unwrap();
let catalog = Arc::new(catalog);
@ -628,7 +617,7 @@ mod tests {
// file number of the path should match the segment id
assert_eq!(
cpu_parqet.path,
ParquetFilePath::new_with_parititon_key("db1", "cpu", "1970-01-01", 4).to_string()
ParquetFilePath::new_with_parititon_key("db1", "cpu", SEGMENT_KEY, 4).to_string()
);
assert_eq!(cpu_parqet.row_count, 1);
assert_eq!(cpu_parqet.min_time, 10);
@ -640,7 +629,7 @@ mod tests {
// file number of the path should match the segment id
assert_eq!(
mem_parqet.path,
ParquetFilePath::new_with_parititon_key("db1", "mem", "1970-01-01", 4).to_string()
ParquetFilePath::new_with_parititon_key("db1", "mem", SEGMENT_KEY, 4).to_string()
);
assert_eq!(mem_parqet.row_count, 2);
assert_eq!(mem_parqet.min_time, 15);

View File

@ -1,10 +1,10 @@
//! Buffers writes and flushes them to the configured wal
use crate::write_buffer::buffer_segment::{BufferedWrite, DatabaseWrite, WriteBatch, WriteSummary};
use crate::write_buffer::{Error, SegmentState, TableBatch};
use crate::{wal, SequenceNumber, WalOp};
use crate::write_buffer::buffer_segment::{BufferedWrite, WriteBatch};
use crate::write_buffer::{Error, SegmentState, ValidSegmentedData};
use crate::{wal, Wal, WalOp};
use crossbeam_channel::{bounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use data_types::NamespaceName;
use iox_time::Time;
use observability_deps::tracing::debug;
use parking_lot::{Mutex, RwLock};
use std::collections::HashMap;
@ -23,10 +23,13 @@ const BUFFER_CHANNEL_LIMIT: usize = 10_000;
// are buffered. If there is an error, it'll be here
#[derive(Debug, Clone)]
pub enum BufferedWriteResult {
Success(WriteSummary),
Success(()),
Error(String),
}
type SegmentedWalOps = HashMap<Time, Vec<WalOp>>;
type SegmentedWriteBatch = HashMap<Time, WriteBatch>;
/// The WriteBufferFlusher buffers writes and flushes them to the configured wal. The wal IO is done in a native
/// thread rather than a tokio task to avoid blocking the tokio runtime. As referenced in this post, continuous
/// long-running IO threads should be off the tokio runtime: `<https://ryhl.io/blog/async-what-is-blocking/>`.
@ -40,7 +43,7 @@ pub struct WriteBufferFlusher {
}
impl WriteBufferFlusher {
pub fn new(segment_state: Arc<RwLock<SegmentState>>) -> Self {
pub fn new<W: Wal>(segment_state: Arc<RwLock<SegmentState<W>>>) -> Self {
let (shutdown_tx, shutdown_rx) = watch::channel(());
let (buffer_tx, buffer_rx) = mpsc::channel(BUFFER_CHANNEL_LIMIT);
let (io_flush_tx, io_flush_rx) = bounded(1);
@ -80,16 +83,13 @@ impl WriteBufferFlusher {
pub async fn write_to_open_segment(
&self,
db_name: NamespaceName<'static>,
table_batches: HashMap<String, TableBatch>,
wal_op: WalOp,
) -> crate::write_buffer::Result<WriteSummary> {
segmented_data: Vec<ValidSegmentedData>,
) -> crate::write_buffer::Result<()> {
let (response_tx, response_rx) = oneshot::channel();
self.buffer_tx
.send(BufferedWrite {
wal_op,
database_write: DatabaseWrite::new(db_name, table_batches),
segmented_data,
response_tx,
})
.await
@ -98,21 +98,21 @@ impl WriteBufferFlusher {
let summary = response_rx.await.expect("wal op buffer thread is dead");
match summary {
BufferedWriteResult::Success(summary) => Ok(summary),
BufferedWriteResult::Success(_) => Ok(()),
BufferedWriteResult::Error(e) => Err(Error::BufferSegmentError(e)),
}
}
}
async fn run_wal_op_buffer(
segment_state: Arc<RwLock<SegmentState>>,
async fn run_wal_op_buffer<W: Wal>(
segment_state: Arc<RwLock<SegmentState<W>>>,
mut buffer_rx: mpsc::Receiver<BufferedWrite>,
io_flush_tx: CrossbeamSender<Vec<WalOp>>,
io_flush_notify_rx: CrossbeamReceiver<wal::Result<SequenceNumber>>,
io_flush_tx: CrossbeamSender<SegmentedWalOps>,
io_flush_notify_rx: CrossbeamReceiver<wal::Result<()>>,
mut shutdown: watch::Receiver<()>,
) {
let mut ops = Vec::new();
let mut write_batch = crate::write_buffer::buffer_segment::WriteBatch::default();
let mut ops = SegmentedWalOps::new();
let mut write_batch = SegmentedWriteBatch::new();
let mut notifies = Vec::new();
let mut interval = tokio::time::interval(BUFFER_FLUSH_INTERVAL);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
@ -121,8 +121,13 @@ async fn run_wal_op_buffer(
// select on either buffering an op, ticking the flush interval, or shutting down
select! {
Some(buffered_write) = buffer_rx.recv() => {
ops.push(buffered_write.wal_op);
write_batch.add_db_write(buffered_write.database_write.db_name, buffered_write.database_write.table_batches);
for segmented_data in buffered_write.segmented_data {
let segment_ops = ops.entry(segmented_data.segment_start).or_default();
segment_ops.push(segmented_data.wal_op);
let segment_write_batch = write_batch.entry(segmented_data.segment_start).or_default();
segment_write_batch.add_db_write(segmented_data.database_name, segmented_data.table_batches);
}
notifies.push(buffered_write.response_tx);
},
_ = interval.tick() => {
@ -130,23 +135,23 @@ async fn run_wal_op_buffer(
continue;
}
let ops_written = ops.len();
// send ops into IO flush channel and wait for response
io_flush_tx.send(ops).expect("wal io thread is dead");
let res = match io_flush_notify_rx.recv().expect("wal io thread is dead") {
Ok(sequence_number) => {
let open_segment = &mut segment_state.write().open_segment;
Ok(()) => {
let mut err = BufferedWriteResult::Success(());
match open_segment.buffer_writes(write_batch) {
Ok(buffer_size) => BufferedWriteResult::Success(WriteSummary {
segment_id: open_segment.segment_id(),
sequence_number,
buffer_size,
}),
Err(e) => BufferedWriteResult::Error(e.to_string()),
let mut segment_state = segment_state.write();
for (time, write_batch) in write_batch {
if let Err(e) = segment_state.write_batch_to_segment(time, write_batch) {
err = BufferedWriteResult::Error(e.to_string());
break;
}
}
err
},
Err(e) => BufferedWriteResult::Error(e.to_string()),
};
@ -157,9 +162,9 @@ async fn run_wal_op_buffer(
}
// reset the buffers
ops = Vec::with_capacity(ops_written);
write_batch = WriteBatch::default();
notifies = Vec::with_capacity(ops_written);
ops = SegmentedWalOps::new();
write_batch = SegmentedWriteBatch::new();
notifies = Vec::new();
},
_ = shutdown.changed() => {
// shutdown has been requested
@ -170,13 +175,13 @@ async fn run_wal_op_buffer(
}
}
fn run_io_flush(
segment_state: Arc<RwLock<SegmentState>>,
buffer_rx: CrossbeamReceiver<Vec<WalOp>>,
buffer_notify: CrossbeamSender<wal::Result<SequenceNumber>>,
fn run_io_flush<W: Wal>(
segment_state: Arc<RwLock<SegmentState<W>>>,
buffer_rx: CrossbeamReceiver<SegmentedWalOps>,
buffer_notify: CrossbeamSender<wal::Result<()>>,
) {
loop {
let batch = match buffer_rx.recv() {
let segmented_wal_ops = match buffer_rx.recv() {
Ok(batch) => batch,
Err(_) => {
// the buffer channel has closed, it's shutdown
@ -186,68 +191,100 @@ fn run_io_flush(
};
let mut state = segment_state.write();
let res = state.open_segment.write_batch(batch);
buffer_notify.send(res).expect("buffer flusher is dead");
// write the ops to the segment files, or return on first error
for (time, wal_ops) in segmented_wal_ops {
let res = state.write_ops_to_segment(time, wal_ops);
if res.is_err() {
buffer_notify.send(res).expect("buffer flusher is dead");
continue;
}
}
buffer_notify.send(Ok(())).expect("buffer flusher is dead");
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_helpers::lp_to_table_batches;
use crate::wal::WalSegmentWriterNoopImpl;
use crate::catalog::Catalog;
use crate::wal::{WalImpl, WalSegmentWriterNoopImpl};
use crate::write_buffer::buffer_segment::OpenBufferSegment;
use crate::{LpWriteOp, SegmentId};
use crate::write_buffer::parse_validate_and_update_catalog;
use crate::{Precision, SegmentDuration, SegmentId, SegmentRange, SequenceNumber};
use data_types::NamespaceName;
#[tokio::test]
async fn flushes_to_open_segment() {
let segment_id = SegmentId::new(3);
let open_segment = OpenBufferSegment::new(
segment_id,
SegmentRange::test_range(),
SequenceNumber::new(0),
Box::new(WalSegmentWriterNoopImpl::new(segment_id)),
None,
);
let segment_state = Arc::new(RwLock::new(SegmentState::new(open_segment)));
let next_segment_id = segment_id.next();
let next_segment_range = SegmentRange::test_range().next();
let next_segment = OpenBufferSegment::new(
next_segment_id,
next_segment_range,
SequenceNumber::new(0),
Box::new(WalSegmentWriterNoopImpl::new(next_segment_id)),
None,
);
let catalog = Arc::new(Catalog::new());
let segment_state = Arc::new(RwLock::new(SegmentState::<WalImpl>::new(
SegmentDuration::new_5m(),
next_segment_id,
Arc::clone(&catalog),
vec![open_segment, next_segment],
vec![],
None,
)));
let flusher = WriteBufferFlusher::new(Arc::clone(&segment_state));
let db_name = NamespaceName::new("db1").unwrap();
let wal_op = WalOp::LpWrite(LpWriteOp {
db_name: db_name.to_string(),
lp: "cpu bar=1 10".to_string(),
default_time: 0,
});
let data = lp_to_table_batches("cpu bar=1 10", 0);
let write_summary = flusher
.write_to_open_segment(db_name.clone(), data, wal_op)
let ingest_time = Time::from_timestamp_nanos(0);
let res = parse_validate_and_update_catalog(
db_name.clone(),
"cpu bar=1 10",
&catalog,
ingest_time,
SegmentDuration::new_5m(),
false,
Precision::Nanosecond,
)
.unwrap();
flusher
.write_to_open_segment(res.valid_segmented_data)
.await
.unwrap();
assert_eq!(write_summary.segment_id, segment_id);
assert_eq!(write_summary.sequence_number, SequenceNumber::new(1));
let wal_op = WalOp::LpWrite(LpWriteOp {
db_name: db_name.to_string(),
lp: "cpu bar=1 20".to_string(),
default_time: 0,
});
let data = lp_to_table_batches("cpu bar=1 20", 0);
let write_summary = flusher
.write_to_open_segment(db_name.clone(), data, wal_op)
let res = parse_validate_and_update_catalog(
db_name.clone(),
"cpu bar=1 20",
&catalog,
ingest_time,
SegmentDuration::new_5m(),
false,
Precision::Nanosecond,
)
.unwrap();
flusher
.write_to_open_segment(res.valid_segmented_data)
.await
.unwrap();
assert_eq!(write_summary.sequence_number, SequenceNumber::new(2));
let state = segment_state.read();
assert_eq!(state.open_segment.segment_id(), segment_id);
let segment = state.segment_for_time(ingest_time).unwrap();
let table_buffer = state
.open_segment
.table_buffer(db_name.as_str(), "cpu")
.unwrap();
let partition_buffer = table_buffer.partition_buffer("1970-01-01").unwrap();
assert_eq!(partition_buffer.row_count(), 2);
assert_eq!(segment.segment_id(), segment_id);
let table_buffer = segment.table_buffer(db_name.as_str(), "cpu").unwrap();
assert_eq!(table_buffer.row_count(), 2);
}
}

View File

@ -7,8 +7,9 @@ use crate::write_buffer::{
buffer_segment::{load_buffer_from_segment, ClosedBufferSegment, OpenBufferSegment},
Result,
};
use crate::Wal;
use crate::{PersistedCatalog, PersistedSegment, Persister, SegmentId};
use crate::{SegmentDuration, SegmentRange, Wal};
use iox_time::Time;
use std::sync::Arc;
use super::Error;
@ -19,14 +20,17 @@ const SEGMENTS_TO_LOAD: usize = 1000;
#[derive(Debug)]
pub struct LoadedState {
pub catalog: Arc<Catalog>,
pub open_segment: OpenBufferSegment,
pub open_segments: Vec<OpenBufferSegment>,
pub persisting_buffer_segments: Vec<ClosedBufferSegment>,
pub persisted_segments: Vec<PersistedSegment>,
pub last_segment_id: SegmentId,
}
pub async fn load_starting_state<W, P>(
persister: Arc<P>,
wal: Option<Arc<W>>,
server_load_time: Time,
segment_duration: SegmentDuration,
) -> Result<LoadedState>
where
W: Wal,
@ -42,76 +46,97 @@ where
.last()
.map(|s| s.segment_id)
.unwrap_or(SegmentId::new(0));
let mut open_segment_id = last_persisted_segment_id.next();
let mut persisting_buffer_segments = Vec::new();
let open_segment = if let Some(wal) = wal {
let current_segment_range =
SegmentRange::from_time_and_duration(server_load_time, segment_duration, false);
let next_segment_range = current_segment_range.next();
let mut open_segments = Vec::new();
if let Some(wal) = wal {
// read any segments that don't show up in the list of persisted segments
// first load up any segments from the wal that haven't been persisted yet, except for the
// last one, which is the open segment.
let wal_segments = wal.segment_files()?;
if !wal_segments.is_empty() {
// update the segment_id of the open segment to be for the last wal file
open_segment_id = wal_segments.last().unwrap().segment_id;
for segment_file in wal_segments.iter().take(wal_segments.len() - 1) {
// if persisted segemnts is empty, load all segments from the wal, otherwise
// only load segments that haven't been persisted yet
if segment_file.segment_id >= last_persisted_segment_id
&& !persisted_segments.is_empty()
{
continue;
}
for segment_file in wal_segments {
// if persisted segments is empty, load all segments from the wal, otherwise
// only load segments that haven't been persisted yet
if segment_file.segment_id <= last_persisted_segment_id
&& !persisted_segments.is_empty()
{
continue;
}
let segment_reader = wal.open_segment_reader(segment_file.segment_id)?;
let starting_sequence_number = catalog.sequence_number();
let buffer = load_buffer_from_segment(&catalog, segment_reader)?;
let starting_sequence_number = catalog.sequence_number();
let segment_reader = wal.open_segment_reader(segment_file.segment_id)?;
let segment_header = *segment_reader.header();
let buffer = load_buffer_from_segment(&catalog, segment_reader)?;
let segment = OpenBufferSegment::new(
segment_file.segment_id,
starting_sequence_number,
Box::new(WalSegmentWriterNoopImpl::new(segment_file.segment_id)),
Some(buffer),
);
let closed_segment = segment.into_closed_segment(Arc::clone(&catalog));
persisting_buffer_segments.push(closed_segment);
let segment = OpenBufferSegment::new(
segment_header.id,
segment_header.range,
starting_sequence_number,
wal.open_segment_writer(segment_file.segment_id)?,
Some(buffer),
);
// if it's the current or next segment, we want to keep it open rather than move it to
// a persisting state
if segment_header.range == current_segment_range
|| segment_header.range == next_segment_range
{
open_segments.push(segment);
} else {
persisting_buffer_segments.push(segment.into_closed_segment(Arc::clone(&catalog)));
}
}
// read the last segment into an open segment
let segment_reader = match wal.open_segment_reader(open_segment_id) {
Ok(reader) => Some(reader),
Err(crate::wal::Error::Io { source, .. })
if source.kind() == std::io::ErrorKind::NotFound =>
{
None
}
Err(e) => return Err(e.into()),
};
let buffered = match segment_reader {
Some(reader) => Some(load_buffer_from_segment(&catalog, reader)?),
None => None,
};
let segment_writer = wal.open_segment_writer(open_segment_id)?;
OpenBufferSegment::new(
open_segment_id,
catalog.sequence_number(),
segment_writer,
buffered,
)
if open_segments.is_empty() {
// ensure that we open up a segment for the "now" period of time
let current_segment_id = last_persisted_segment_id.next();
let current_segment = OpenBufferSegment::new(
current_segment_id,
current_segment_range,
catalog.sequence_number(),
wal.new_segment_writer(current_segment_id, current_segment_range)?,
None,
);
open_segments.push(current_segment);
}
} else {
OpenBufferSegment::new(
open_segment_id,
// ensure that we open up a segment for the "now" period of time
let current_segment_id = last_persisted_segment_id.next();
let current_segment = OpenBufferSegment::new(
current_segment_id,
current_segment_range,
catalog.sequence_number(),
Box::new(WalSegmentWriterNoopImpl::new(open_segment_id)),
Box::new(WalSegmentWriterNoopImpl::new(current_segment_id)),
None,
)
);
open_segments.push(current_segment);
};
let last_segment_id = open_segments
.iter()
.map(|s| s.segment_id())
.max()
.unwrap_or(SegmentId::new(0))
.max(last_persisted_segment_id)
.max(
persisted_segments
.last()
.map(|s| s.segment_id)
.unwrap_or(SegmentId::new(0)),
);
Ok(LoadedState {
catalog,
open_segment,
last_segment_id,
open_segments,
persisting_buffer_segments,
persisted_segments,
})
@ -123,8 +148,12 @@ mod tests {
use crate::persister::PersisterImpl;
use crate::test_helpers::lp_to_write_batch;
use crate::wal::{WalImpl, WalSegmentWriterNoopImpl};
use crate::{DatabaseTables, LpWriteOp, ParquetFile, SequenceNumber, TableParquetFiles, WalOp};
use crate::{
DatabaseTables, LpWriteOp, ParquetFile, SegmentRange, SequenceNumber, TableParquetFiles,
WalOp,
};
use arrow_util::assert_batches_eq;
use iox_time::Time;
use object_store::memory::InMemory;
use object_store::ObjectStore;
use pretty_assertions::assert_eq;
@ -137,8 +166,13 @@ mod tests {
let segment_id = SegmentId::new(4);
let segment_writer = Box::new(WalSegmentWriterNoopImpl::new(segment_id));
let mut open_segment =
OpenBufferSegment::new(segment_id, SequenceNumber::new(0), segment_writer, None);
let mut open_segment = OpenBufferSegment::new(
segment_id,
SegmentRange::test_range(),
SequenceNumber::new(0),
segment_writer,
None,
);
let catalog = Catalog::new();
@ -152,7 +186,7 @@ mod tests {
let write_batch = lp_to_write_batch(&catalog, "db1", lp);
open_segment.write_batch(vec![wal_op]).unwrap();
open_segment.write_wal_ops(vec![wal_op]).unwrap();
open_segment.buffer_writes(write_batch).unwrap();
let catalog = Arc::new(catalog);
@ -162,15 +196,23 @@ mod tests {
.await
.unwrap();
let loaded_state = load_starting_state(persister, None::<Arc<crate::wal::WalImpl>>)
.await
.unwrap();
let loaded_state = load_starting_state(
persister,
None::<Arc<crate::wal::WalImpl>>,
Time::from_timestamp_nanos(0),
SegmentDuration::new_5m(),
)
.await
.unwrap();
let expected_catalog = catalog.clone_inner();
let loaded_catalog = loaded_state.catalog.clone_inner();
assert_eq!(expected_catalog, loaded_catalog);
// the open segment it creates should be the next value in line
assert_eq!(loaded_state.open_segment.segment_id(), SegmentId::new(5));
assert_eq!(
loaded_state.open_segments[0].segment_id(),
SegmentId::new(5)
);
let persisted_segment = loaded_state.persisted_segments.first().unwrap();
assert_eq!(persisted_segment.segment_id, segment_id);
assert_eq!(persisted_segment.segment_row_count, 3);
@ -194,6 +236,8 @@ mod tests {
.len(),
1
);
assert_eq!(loaded_state.last_segment_id, SegmentId::new(5));
}
#[tokio::test]
@ -206,11 +250,18 @@ mod tests {
let LoadedState {
catalog,
mut open_segment,
mut open_segments,
..
} = load_starting_state(Arc::clone(&persister), Some(Arc::clone(&wal)))
.await
.unwrap();
} = load_starting_state(
Arc::clone(&persister),
Some(Arc::clone(&wal)),
Time::from_timestamp_nanos(0),
SegmentDuration::new_5m(),
)
.await
.unwrap();
let mut current_segment = open_segments.pop().unwrap();
let lp = "cpu,tag1=cupcakes bar=1 10\nmem,tag2=turtles bar=3 15\nmem,tag2=snakes bar=2 20";
@ -222,10 +273,18 @@ mod tests {
let write_batch = lp_to_write_batch(&catalog, db_name, lp);
open_segment.write_batch(vec![wal_op]).unwrap();
open_segment.buffer_writes(write_batch).unwrap();
current_segment.write_wal_ops(vec![wal_op.clone()]).unwrap();
current_segment.buffer_writes(write_batch).unwrap();
let loaded_state = load_starting_state(persister, Some(wal)).await.unwrap();
let loaded_state = load_starting_state(
persister,
Some(wal),
Time::from_timestamp_nanos(0),
SegmentDuration::new_5m(),
)
.await
.unwrap();
let current_segment = loaded_state.open_segments.first().unwrap();
assert!(loaded_state.persisting_buffer_segments.is_empty());
assert!(loaded_state.persisted_segments.is_empty());
@ -235,12 +294,9 @@ mod tests {
assert!(db.tables.contains_key("mem"));
let cpu_table = db.get_table("cpu").unwrap();
let cpu_data = open_segment
let cpu_data = current_segment
.table_buffer(db_name, "cpu")
.unwrap()
.partition_buffers
.get("1970-01-01")
.unwrap()
.rows_to_record_batch(&cpu_table.schema, cpu_table.columns());
let expected = vec![
"+-----+----------+--------------------------------+",
@ -252,12 +308,9 @@ mod tests {
assert_batches_eq!(&expected, &[cpu_data]);
let mem_table = db.get_table("mem").unwrap();
let mem_data = open_segment
let mem_data = current_segment
.table_buffer(db_name, "mem")
.unwrap()
.partition_buffers
.get("1970-01-01")
.unwrap()
.rows_to_record_batch(&mem_table.schema, mem_table.columns());
let expected = vec![
"+-----+---------+--------------------------------+",
@ -268,6 +321,8 @@ mod tests {
"+-----+---------+--------------------------------+",
];
assert_batches_eq!(&expected, &[mem_data]);
assert_eq!(loaded_state.last_segment_id, SegmentId::new(1));
}
#[tokio::test]
@ -280,11 +335,18 @@ mod tests {
let LoadedState {
catalog,
mut open_segment,
mut open_segments,
..
} = load_starting_state(Arc::clone(&persister), Some(Arc::clone(&wal)))
.await
.unwrap();
} = load_starting_state(
Arc::clone(&persister),
Some(Arc::clone(&wal)),
Time::from_timestamp_nanos(0),
SegmentDuration::new_5m(),
)
.await
.unwrap();
let mut current_segment = open_segments.pop().unwrap();
let lp = "cpu,tag1=cupcakes bar=1 10\nmem,tag2=turtles bar=3 15\nmem,tag2=snakes bar=2 20";
@ -296,25 +358,22 @@ mod tests {
let write_batch = lp_to_write_batch(&catalog, db_name, lp);
open_segment.write_batch(vec![wal_op]).unwrap();
open_segment.buffer_writes(write_batch).unwrap();
current_segment.write_wal_ops(vec![wal_op]).unwrap();
current_segment.buffer_writes(write_batch).unwrap();
let segment_id = open_segment.segment_id();
let next_segment_id = open_segment.segment_id().next();
let segment_id = current_segment.segment_id();
open_segment
let next_segment_id = segment_id.next();
let next_segment_range = current_segment.segment_range().next();
// close and persist the current segment
current_segment
.into_closed_segment(Arc::clone(&catalog))
.persist(Arc::clone(&persister))
.await
.unwrap();
let mut open_segment = OpenBufferSegment::new(
next_segment_id,
catalog.sequence_number(),
wal.open_segment_writer(next_segment_id).unwrap(),
None,
);
// write data into the next segment
let lp = "cpu,tag1=cupcakes bar=3 20\nfoo val=1 123";
let wal_op = WalOp::LpWrite(LpWriteOp {
@ -325,17 +384,39 @@ mod tests {
let write_batch = lp_to_write_batch(&catalog, db_name, lp);
open_segment.write_batch(vec![wal_op]).unwrap();
open_segment.buffer_writes(write_batch).unwrap();
let segment_writer = wal
.new_segment_writer(next_segment_id, next_segment_range)
.unwrap();
let mut next_segment = OpenBufferSegment::new(
SegmentId::new(2),
SegmentRange::test_range().next(),
catalog.sequence_number(),
segment_writer,
None,
);
let loaded_state = load_starting_state(persister, Some(wal)).await.unwrap();
next_segment.write_wal_ops(vec![wal_op]).unwrap();
next_segment.buffer_writes(write_batch).unwrap();
// now load up with a start time that puts us in next segment period
let loaded_state = load_starting_state(
persister,
Some(wal),
Time::from_timestamp(6 * 60, 0).unwrap(),
SegmentDuration::new_5m(),
)
.await
.unwrap();
// verify that the persisted segment doesn't show up as one that should be persisting
assert!(loaded_state.persisting_buffer_segments.is_empty());
// verify the data was persisted
assert_eq!(
loaded_state.persisted_segments[0],
PersistedSegment {
segment_id,
segment_wal_size_bytes: 201,
segment_wal_size_bytes: 227,
segment_parquet_size_bytes: 3458,
segment_row_count: 3,
segment_min_time: 10,
@ -349,7 +430,7 @@ mod tests {
TableParquetFiles {
table_name: "cpu".to_string(),
parquet_files: vec![ParquetFile {
path: "dbs/db1/cpu/1970-01-01/4294967294.parquet"
path: "dbs/db1/cpu/1970-01-01T00-00/4294967294.parquet"
.to_string(),
size_bytes: 1721,
row_count: 1,
@ -364,7 +445,7 @@ mod tests {
TableParquetFiles {
table_name: "mem".to_string(),
parquet_files: vec![ParquetFile {
path: "dbs/db1/mem/1970-01-01/4294967294.parquet"
path: "dbs/db1/mem/1970-01-01T00-00/4294967294.parquet"
.to_string(),
size_bytes: 1737,
row_count: 2,
@ -386,12 +467,9 @@ mod tests {
assert!(db.tables.contains_key("foo"));
let cpu_table = db.get_table("cpu").unwrap();
let cpu_data = open_segment
let cpu_data = loaded_state.open_segments[0]
.table_buffer(db_name, "cpu")
.unwrap()
.partition_buffers
.get("1970-01-01")
.unwrap()
.rows_to_record_batch(&cpu_table.schema, cpu_table.columns());
let expected = vec![
"+-----+----------+--------------------------------+",
@ -403,12 +481,9 @@ mod tests {
assert_batches_eq!(&expected, &[cpu_data]);
let foo_table = db.get_table("foo").unwrap();
let foo_data = open_segment
let foo_data = loaded_state.open_segments[0]
.table_buffer(db_name, "foo")
.unwrap()
.partition_buffers
.get("1970-01-01")
.unwrap()
.rows_to_record_batch(&foo_table.schema, foo_table.columns());
let expected = vec![
"+--------------------------------+-----+",
@ -418,6 +493,8 @@ mod tests {
"+--------------------------------+-----+",
];
assert_batches_eq!(&expected, &[foo_data]);
assert_eq!(loaded_state.last_segment_id, SegmentId::new(2));
}
#[tokio::test]
@ -430,11 +507,17 @@ mod tests {
let LoadedState {
catalog,
mut open_segment,
mut open_segments,
..
} = load_starting_state(Arc::clone(&persister), Some(Arc::clone(&wal)))
.await
.unwrap();
} = load_starting_state(
Arc::clone(&persister),
Some(Arc::clone(&wal)),
Time::from_timestamp_nanos(0),
SegmentDuration::new_5m(),
)
.await
.unwrap();
let mut current_segment = open_segments.pop().unwrap();
let lp = "cpu,tag1=cupcakes bar=1 10\nmem,tag2=turtles bar=3 15\nmem,tag2=snakes bar=2 20";
@ -446,20 +529,16 @@ mod tests {
let write_batch = lp_to_write_batch(&catalog, db_name, lp);
open_segment.write_batch(vec![wal_op]).unwrap();
open_segment.buffer_writes(write_batch).unwrap();
current_segment.write_wal_ops(vec![wal_op]).unwrap();
current_segment.buffer_writes(write_batch).unwrap();
let next_segment_id = open_segment.segment_id().next();
let next_segment_id = current_segment.segment_id().next();
let next_segment_range = current_segment.segment_range().next();
let closed_segment = open_segment.into_closed_segment(Arc::clone(&catalog));
let mut open_segment = OpenBufferSegment::new(
next_segment_id,
catalog.sequence_number(),
wal.open_segment_writer(next_segment_id).unwrap(),
None,
);
// close the current segment
let closed_segment = current_segment.into_closed_segment(Arc::clone(&catalog));
// write data into the next segment
let lp = "cpu,tag1=apples bar=3 20\nfoo val=1 123";
let wal_op = WalOp::LpWrite(LpWriteOp {
@ -470,10 +549,31 @@ mod tests {
let write_batch = lp_to_write_batch(&catalog, db_name, lp);
open_segment.write_batch(vec![wal_op]).unwrap();
open_segment.buffer_writes(write_batch).unwrap();
let segment_writer = wal
.new_segment_writer(next_segment_id, next_segment_range)
.unwrap();
let mut next_segment = OpenBufferSegment::new(
SegmentId::new(2),
SegmentRange::test_range().next(),
catalog.sequence_number(),
segment_writer,
None,
);
let loaded_state = load_starting_state(persister, Some(wal)).await.unwrap();
next_segment.write_wal_ops(vec![wal_op]).unwrap();
next_segment.buffer_writes(write_batch).unwrap();
// now load up with a start time that puts us in next segment period. we should now
// have the previous current_segment in persisting, the previous next_segment as the
// new current_segment, and a brand new next_segment
let loaded_state = load_starting_state(
persister,
Some(wal),
Time::from_timestamp(6 * 60, 0).unwrap(),
SegmentDuration::new_5m(),
)
.await
.unwrap();
assert_eq!(loaded_state.persisting_buffer_segments.len(), 1);
let loaded_closed_segment = &loaded_state.persisting_buffer_segments[0];
@ -498,12 +598,9 @@ mod tests {
assert!(db.tables.contains_key("foo"));
let cpu_table = db.get_table("cpu").unwrap();
let cpu_data = open_segment
let cpu_data = loaded_state.open_segments[0]
.table_buffer(db_name, "cpu")
.unwrap()
.partition_buffers
.get("1970-01-01")
.unwrap()
.rows_to_record_batch(&cpu_table.schema, cpu_table.columns());
let expected = vec![
"+-----+--------+--------------------------------+",
@ -515,12 +612,9 @@ mod tests {
assert_batches_eq!(&expected, &[cpu_data]);
let foo_table = db.get_table("foo").unwrap();
let foo_data = open_segment
let foo_data = loaded_state.open_segments[0]
.table_buffer(db_name, "foo")
.unwrap()
.partition_buffers
.get("1970-01-01")
.unwrap()
.rows_to_record_batch(&foo_table.schema, foo_table.columns());
let expected = vec![
"+--------------------------------+-----+",
@ -530,5 +624,7 @@ mod tests {
"+--------------------------------+-----+",
];
assert_batches_eq!(&expected, &[foo_data]);
assert_eq!(loaded_state.last_segment_id, SegmentId::new(2));
}
}

View File

@ -5,19 +5,21 @@ mod flusher;
mod loader;
use crate::catalog::{Catalog, DatabaseSchema, TableDefinition, TIME_COLUMN_NAME};
use crate::write_buffer::buffer_segment::{ClosedBufferSegment, OpenBufferSegment, TableBuffer};
use crate::wal::WalSegmentWriterNoopImpl;
use crate::write_buffer::buffer_segment::{
ClosedBufferSegment, OpenBufferSegment, TableBuffer, WriteBatch,
};
use crate::write_buffer::flusher::WriteBufferFlusher;
use crate::write_buffer::loader::load_starting_state;
use crate::{
BufferSegment, BufferedWriteRequest, Bufferer, ChunkContainer, LpWriteOp, Persister, Precision,
SegmentId, Wal, WalOp, WriteBuffer, WriteLineError,
wal, BufferSegment, BufferedWriteRequest, Bufferer, ChunkContainer, LpWriteOp, Persister,
Precision, SegmentDuration, SegmentId, SegmentRange, Wal, WalOp, WriteBuffer, WriteLineError,
};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use chrono::{TimeZone, Utc};
use data_types::{
column_type_from_field, ChunkId, ChunkOrder, ColumnType, NamespaceName, PartitionKey, TableId,
TransitionPartitionId,
column_type_from_field, ChunkId, ChunkOrder, ColumnType, NamespaceName, NamespaceNameError,
TableId, TransitionPartitionId,
};
use datafusion::common::{DataFusionError, Statistics};
use datafusion::execution::context::SessionState;
@ -25,7 +27,8 @@ use datafusion::logical_expr::Expr;
use influxdb_line_protocol::{parse_lines, FieldValue, ParsedLine};
use iox_query::chunk_statistics::create_chunk_statistics;
use iox_query::{QueryChunk, QueryChunkData};
use observability_deps::tracing::{debug, info};
use iox_time::{Time, TimeProvider};
use observability_deps::tracing::{debug, error, info};
use parking_lot::RwLock;
use schema::sort::SortKey;
use schema::Schema;
@ -58,10 +61,21 @@ pub enum Error {
#[error("error from persister: {0}")]
PersisterError(#[from] crate::persister::Error),
#[error("corrupt load state: {0}")]
CorruptLoadState(String),
#[error("database name error: {0}")]
DatabaseNameError(#[from] NamespaceNameError),
#[error("walop in file {0} contained data for more than one segment, which is invalid")]
WalOpForMultipleSegments(String),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
const OPEN_SEGMENT_LIMIT: usize = 98;
#[derive(Debug)]
pub struct WriteRequest<'a> {
pub db_name: NamespaceName<'static>,
@ -70,38 +84,133 @@ pub struct WriteRequest<'a> {
}
#[derive(Debug)]
pub struct WriteBufferImpl<W> {
pub struct WriteBufferImpl<W, T> {
catalog: Arc<Catalog>,
segment_state: Arc<RwLock<SegmentState>>,
segment_state: Arc<RwLock<SegmentState<W>>>,
#[allow(dead_code)]
wal: Option<Arc<W>>,
write_buffer_flusher: WriteBufferFlusher,
segment_duration: SegmentDuration,
#[allow(dead_code)]
time_provider: Arc<T>,
}
#[derive(Debug)]
struct SegmentState {
open_segment: OpenBufferSegment,
struct SegmentState<W> {
segment_duration: SegmentDuration,
last_segment_id: SegmentId,
catalog: Arc<Catalog>,
wal: Option<Arc<W>>,
// Map of segment start times to open segments. Should always have a segment open for the
// start time that time.now falls into.
segments: BTreeMap<Time, OpenBufferSegment>,
#[allow(dead_code)]
persisting_segments: Vec<ClosedBufferSegment>,
}
impl SegmentState {
pub fn new(open_segment: OpenBufferSegment) -> Self {
impl<W: Wal> SegmentState<W> {
pub fn new(
segment_duration: SegmentDuration,
last_segment_id: SegmentId,
catalog: Arc<Catalog>,
open_segments: Vec<OpenBufferSegment>,
persisting_segments: Vec<ClosedBufferSegment>,
wal: Option<Arc<W>>,
) -> Self {
let segments = open_segments
.into_iter()
.map(|s| (s.segment_range().start_time, s))
.collect();
Self {
open_segment,
persisting_segments: vec![],
segment_duration,
last_segment_id,
catalog,
wal,
segments,
persisting_segments,
}
}
pub(crate) fn write_ops_to_segment(
&mut self,
segment_start: Time,
ops: Vec<WalOp>,
) -> wal::Result<()> {
let segment = self.get_or_create_segment_for_time(segment_start)?;
segment.write_wal_ops(ops)
}
pub(crate) fn write_batch_to_segment(
&mut self,
segment_start: Time,
write_batch: WriteBatch,
) -> Result<()> {
let segment = self.get_or_create_segment_for_time(segment_start)?;
segment.buffer_writes(write_batch)
}
#[allow(dead_code)]
pub(crate) fn segment_for_time(&self, time: Time) -> Option<&OpenBufferSegment> {
self.segments.get(&time)
}
// return the segment with this start time or open up a new one if it isn't currently open.
fn get_or_create_segment_for_time(
&mut self,
time: Time,
) -> wal::Result<&mut OpenBufferSegment> {
if !self.segments.contains_key(&time) {
if self.segments.len() >= OPEN_SEGMENT_LIMIT {
return Err(wal::Error::OpenSegmentLimitReached(OPEN_SEGMENT_LIMIT));
}
self.last_segment_id = self.last_segment_id.next();
let segment_id = self.last_segment_id;
let segment_range =
SegmentRange::from_time_and_duration(time, self.segment_duration, false);
let segment_writer = match &self.wal {
Some(wal) => wal.new_segment_writer(segment_id, segment_range)?,
None => Box::new(WalSegmentWriterNoopImpl::new(segment_id)),
};
let segment = OpenBufferSegment::new(
segment_id,
segment_range,
self.catalog.sequence_number(),
segment_writer,
None,
);
self.segments.insert(time, segment);
}
Ok(self.segments.get_mut(&time).unwrap())
}
}
impl<W: Wal> WriteBufferImpl<W> {
pub async fn new<P>(persister: Arc<P>, wal: Option<Arc<W>>) -> Result<Self>
impl<W: Wal, T: TimeProvider> WriteBufferImpl<W, T> {
pub async fn new<P>(
persister: Arc<P>,
wal: Option<Arc<W>>,
time_provider: Arc<T>,
segment_duration: SegmentDuration,
) -> Result<Self>
where
P: Persister,
Error: From<P::Error>,
{
let loaded_state = load_starting_state(persister, wal.clone()).await?;
let segment_state = Arc::new(RwLock::new(SegmentState::new(loaded_state.open_segment)));
let now = time_provider.now();
let loaded_state =
load_starting_state(persister, wal.clone(), now, segment_duration).await?;
let segment_state = Arc::new(RwLock::new(SegmentState::new(
segment_duration,
loaded_state.last_segment_id,
Arc::clone(&loaded_state.catalog),
loaded_state.open_segments,
loaded_state.persisting_buffer_segments,
wal.clone(),
)));
let write_buffer_flusher = WriteBufferFlusher::new(Arc::clone(&segment_state));
@ -110,6 +219,8 @@ impl<W: Wal> WriteBufferImpl<W> {
segment_state,
wal,
write_buffer_flusher,
time_provider,
segment_duration,
})
}
@ -121,30 +232,24 @@ impl<W: Wal> WriteBufferImpl<W> {
&self,
db_name: NamespaceName<'static>,
lp: &str,
default_time: i64,
ingest_time: Time,
accept_partial: bool,
precision: Precision,
) -> Result<BufferedWriteRequest> {
debug!("write_lp to {} in writebuffer", db_name);
let result = parse_validate_and_update_catalog(
db_name.as_str(),
db_name.clone(),
lp,
&self.catalog,
default_time,
ingest_time,
self.segment_duration,
accept_partial,
precision,
)?;
let wal_op = WalOp::LpWrite(LpWriteOp {
db_name: db_name.to_string(),
lp: result.lp_valid,
default_time,
});
let write_summary = self
.write_buffer_flusher
.write_to_open_segment(db_name.clone(), result.table_batches, wal_op)
self.write_buffer_flusher
.write_to_open_segment(result.valid_segmented_data)
.await?;
Ok(BufferedWriteRequest {
@ -153,9 +258,6 @@ impl<W: Wal> WriteBufferImpl<W> {
line_count: result.line_count,
field_count: result.field_count,
tag_count: result.tag_count,
total_buffer_memory_used: write_summary.buffer_size,
segment_id: write_summary.segment_id,
sequence_number: write_summary.sequence_number,
})
}
@ -177,41 +279,46 @@ impl<W: Wal> WriteBufferImpl<W> {
.ok_or_else(|| DataFusionError::Execution(format!("table {} not found", table_name)))?;
let schema = table.schema.clone();
let table_buffer = self
.clone_table_buffer(database_name, table_name)
.unwrap_or_default();
let table_buffers = self.clone_table_buffers(database_name, table_name);
let chunks = table_buffers
.into_iter()
.map(|table_buffer| {
let batch = table_buffer.rows_to_record_batch(&schema, table.columns());
let batch_stats = create_chunk_statistics(
Some(table_buffer.row_count()),
&schema,
Some(table_buffer.timestamp_min_max()),
None,
);
let mut chunks = Vec::with_capacity(table_buffer.partition_buffers.len());
let chunk: Arc<dyn QueryChunk> = Arc::new(BufferChunk {
batches: vec![batch],
schema: schema.clone(),
stats: Arc::new(batch_stats),
partition_id: TransitionPartitionId::new(
TableId::new(0),
&table_buffer.segment_key,
),
sort_key: None,
id: ChunkId::new(),
chunk_order: ChunkOrder::new(0),
});
for (partition_key, partition_buffer) in table_buffer.partition_buffers {
let partition_key: PartitionKey = partition_key.into();
let batch = partition_buffer.rows_to_record_batch(&schema, table.columns());
let batch_stats = create_chunk_statistics(
Some(partition_buffer.row_count()),
&schema,
Some(partition_buffer.timestamp_min_max()),
None,
);
let chunk = BufferChunk {
batches: vec![batch],
schema: schema.clone(),
stats: Arc::new(batch_stats),
partition_id: TransitionPartitionId::new(TableId::new(0), &partition_key),
sort_key: None,
id: ChunkId::new(),
chunk_order: ChunkOrder::new(0),
};
chunks.push(Arc::new(chunk) as _);
}
chunk
})
.collect();
Ok(chunks)
}
fn clone_table_buffer(&self, database_name: &str, table_name: &str) -> Option<TableBuffer> {
fn clone_table_buffers(&self, database_name: &str, table_name: &str) -> Vec<TableBuffer> {
let state = self.segment_state.read();
state.open_segment.table_buffer(database_name, table_name)
state
.segments
.values()
.filter_map(|segment| segment.table_buffer(database_name, table_name))
.collect::<Vec<_>>()
}
#[cfg(test)]
@ -220,30 +327,25 @@ impl<W: Wal> WriteBufferImpl<W> {
let table = db_schema.tables.get(table_name).unwrap();
let schema = table.schema.clone();
let table_buffer = self.clone_table_buffer(datbase_name, table_name).unwrap();
let mut batches = Vec::with_capacity(table_buffer.partition_buffers.len());
for (_, partition_buffer) in table_buffer.partition_buffers {
let batch = partition_buffer.rows_to_record_batch(&schema, table.columns());
batches.push(batch);
}
batches
let table_buffer = self.clone_table_buffers(datbase_name, table_name);
table_buffer
.into_iter()
.map(|table_buffer| table_buffer.rows_to_record_batch(&schema, table.columns()))
.collect()
}
}
#[async_trait]
impl<W: Wal> Bufferer for WriteBufferImpl<W> {
impl<W: Wal, T: TimeProvider> Bufferer for WriteBufferImpl<W, T> {
async fn write_lp(
&self,
database: NamespaceName<'static>,
lp: &str,
default_time: i64,
ingest_time: Time,
accept_partial: bool,
precision: Precision,
) -> Result<BufferedWriteRequest> {
self.write_lp(database, lp, default_time, accept_partial, precision)
self.write_lp(database, lp, ingest_time, accept_partial, precision)
.await
}
@ -268,7 +370,7 @@ impl<W: Wal> Bufferer for WriteBufferImpl<W> {
}
}
impl<W: Wal> ChunkContainer for WriteBufferImpl<W> {
impl<W: Wal, T: TimeProvider> ChunkContainer for WriteBufferImpl<W, T> {
fn get_table_chunks(
&self,
database_name: &str,
@ -281,7 +383,7 @@ impl<W: Wal> ChunkContainer for WriteBufferImpl<W> {
}
}
impl<W: Wal> WriteBuffer for WriteBufferImpl<W> {}
impl<W: Wal, T: TimeProvider> WriteBuffer for WriteBufferImpl<W, T> {}
#[derive(Debug)]
pub struct BufferChunk {
@ -343,22 +445,22 @@ impl QueryChunk for BufferChunk {
}
}
const YEAR_MONTH_DAY_TIME_FORMAT: &str = "%Y-%m-%d";
pub(crate) fn parse_validate_and_update_catalog(
db_name: &str,
db_name: NamespaceName<'static>,
lp: &str,
catalog: &Catalog,
default_time: i64,
ingest_time: Time,
segment_duration: SegmentDuration,
accept_partial: bool,
precision: Precision,
) -> Result<ValidationResult> {
let (sequence, db) = catalog.db_or_create(db_name)?;
let (sequence, db) = catalog.db_or_create(db_name.as_str())?;
let mut result = parse_validate_and_update_schema(
lp,
&db,
&Partitioner::new_per_day_partitioner(),
default_time,
db_name,
ingest_time,
segment_duration,
accept_partial,
precision,
)?;
@ -373,20 +475,21 @@ pub(crate) fn parse_validate_and_update_catalog(
}
/// Takes &str of line protocol, parses lines, validates the schema, and inserts new columns
/// and partitions if present. Assigns the default time to any lines that do not include a time
/// if present. Assigns the default time to any lines that do not include a time
pub(crate) fn parse_validate_and_update_schema(
lp: &str,
schema: &DatabaseSchema,
partitioner: &Partitioner,
default_time: i64,
db_name: NamespaceName<'static>,
ingest_time: Time,
segment_duration: SegmentDuration,
accept_partial: bool,
precision: Precision,
) -> Result<ValidationResult> {
let mut lines = vec![];
let mut errors = vec![];
let mut valid_lines = vec![];
let mut lp_lines = lp.lines();
let mut valid_parsed_and_raw_lines: Vec<(ParsedLine, &str)> = vec![];
for (line_idx, maybe_line) in parse_lines(lp).enumerate() {
let line = match maybe_line {
Ok(line) => line,
@ -413,16 +516,21 @@ pub(crate) fn parse_validate_and_update_schema(
};
// This unwrap is fine because we're moving line by line
// alongside the output from parse_lines
valid_lines.push(lp_lines.next().unwrap());
lines.push(line);
valid_parsed_and_raw_lines.push((line, lp_lines.next().unwrap()));
}
validate_or_insert_schema_and_partitions(lines, schema, partitioner, default_time, precision)
.map(move |mut result| {
result.lp_valid = valid_lines.join("\n");
result.errors = errors;
result
})
validate_or_insert_schema_and_partitions(
valid_parsed_and_raw_lines,
schema,
db_name,
ingest_time,
segment_duration,
precision,
)
.map(move |mut result| {
result.errors = errors;
result
})
}
/// Takes parsed lines, validates their schema. If new tables or columns are defined, they
@ -430,32 +538,34 @@ pub(crate) fn parse_validate_and_update_schema(
/// into partitions and the validation result contains the data that can then be serialized
/// into the WAL.
pub(crate) fn validate_or_insert_schema_and_partitions(
lines: Vec<ParsedLine<'_>>,
lines: Vec<(ParsedLine<'_>, &str)>,
schema: &DatabaseSchema,
partitioner: &Partitioner,
default_time: i64,
db_name: NamespaceName<'static>,
ingest_time: Time,
segment_duration: SegmentDuration,
precision: Precision,
) -> Result<ValidationResult> {
// The (potentially updated) DatabaseSchema to return to the caller.
let mut schema = Cow::Borrowed(schema);
// The parsed and validated table_batches
let mut table_batches: HashMap<String, TableBatch> = HashMap::new();
let mut segment_table_batches: HashMap<Time, TableBatchMap> = HashMap::new();
let line_count = lines.len();
let mut field_count = 0;
let mut tag_count = 0;
for line in lines.into_iter() {
for (line, raw_line) in lines.into_iter() {
field_count += line.field_set.len();
tag_count += line.series.tag_set.as_ref().map(|t| t.len()).unwrap_or(0);
validate_and_convert_parsed_line(
line,
&mut table_batches,
raw_line,
&mut segment_table_batches,
&mut schema,
partitioner,
default_time,
ingest_time,
segment_duration,
precision,
)?;
}
@ -465,25 +575,39 @@ pub(crate) fn validate_or_insert_schema_and_partitions(
Cow::Borrowed(_) => None,
};
let valid_segmented_data = segment_table_batches
.into_iter()
.map(|(segment_start, table_batches)| ValidSegmentedData {
database_name: db_name.clone(),
segment_start,
table_batches: table_batches.table_batches,
wal_op: WalOp::LpWrite(LpWriteOp {
db_name: db_name.to_string(),
lp: table_batches.lines.join("\n"),
default_time: ingest_time.timestamp_nanos(),
}),
})
.collect();
Ok(ValidationResult {
schema,
table_batches,
line_count,
field_count,
tag_count,
errors: vec![],
lp_valid: String::new(),
valid_segmented_data,
})
}
// &mut Cow is used to avoid a copy, so allow it
#[allow(clippy::ptr_arg)]
fn validate_and_convert_parsed_line(
fn validate_and_convert_parsed_line<'a>(
line: ParsedLine<'_>,
table_batches: &mut HashMap<String, TableBatch>,
raw_line: &'a str,
segment_table_batches: &mut HashMap<Time, TableBatchMap<'a>>,
schema: &mut Cow<'_, DatabaseSchema>,
partitioner: &Partitioner,
default_time: i64,
ingest_time: Time,
segment_duration: SegmentDuration,
precision: Precision,
) -> Result<()> {
let table_name = line.series.measurement.as_str();
@ -537,8 +661,6 @@ fn validate_and_convert_parsed_line(
}
};
let partition_key = partitioner.partition_key_for_line(&line, default_time);
// now that we've ensured all columns exist in the schema, construct the actual row and values
// while validating the column types match.
let mut values = Vec::with_capacity(line.column_count() + 1);
@ -571,7 +693,7 @@ fn validate_and_convert_parsed_line(
}
// set the time value
let time_value = line
let time_value_nanos = line
.timestamp
.map(|ts| {
let multiplier = match precision {
@ -591,24 +713,28 @@ fn validate_and_convert_parsed_line(
ts * multiplier
})
.unwrap_or(default_time);
.unwrap_or(ingest_time.timestamp_nanos());
let segment_start = segment_duration.start_time(time_value_nanos / 1_000_000_000);
values.push(Field {
name: TIME_COLUMN_NAME.to_string(),
value: FieldData::Timestamp(time_value),
value: FieldData::Timestamp(time_value_nanos),
});
let table_batch = table_batches.entry(table_name.to_string()).or_default();
let partition_batch = table_batch
.partition_batches
.entry(partition_key)
.or_default();
let table_batch_map = segment_table_batches.entry(segment_start).or_default();
// insert the row into the partition batch
partition_batch.rows.push(Row {
time: time_value,
let table_batch = table_batch_map
.table_batches
.entry(table_name.to_string())
.or_default();
table_batch.rows.push(Row {
time: time_value_nanos,
fields: values,
});
table_batch_map.lines.push(raw_line);
Ok(())
}
@ -616,12 +742,6 @@ fn validate_and_convert_parsed_line(
pub(crate) struct TableBatch {
#[allow(dead_code)]
pub(crate) name: String,
// map of partition key to partition batch
pub(crate) partition_batches: HashMap<String, PartitionBatch>,
}
#[derive(Debug, Default)]
pub(crate) struct PartitionBatch {
pub(crate) rows: Vec<Row>,
}
@ -673,8 +793,6 @@ pub(crate) struct ValidationResult {
/// If the namespace schema is updated with new tables or columns it will be here, which
/// can be used to update the cache.
pub(crate) schema: Option<DatabaseSchema>,
/// Map of table name to TableBatch
pub(crate) table_batches: HashMap<String, TableBatch>,
/// Number of lines passed in
pub(crate) line_count: usize,
/// Number of fields passed in
@ -683,37 +801,23 @@ pub(crate) struct ValidationResult {
pub(crate) tag_count: usize,
/// Any errors that ocurred while parsing the lines
pub(crate) errors: Vec<crate::WriteLineError>,
/// Only valid lines from what was passed in to validate
pub(crate) lp_valid: String,
/// Only valid lines from what was passed in to validate, segmented based on the
/// timestamps of the data.
pub(crate) valid_segmented_data: Vec<ValidSegmentedData>,
}
/// Generates the partition key for a given line or row
#[derive(Debug)]
pub struct Partitioner {
time_format: String,
pub(crate) struct ValidSegmentedData {
pub(crate) database_name: NamespaceName<'static>,
pub(crate) segment_start: Time,
pub(crate) table_batches: HashMap<String, TableBatch>,
pub(crate) wal_op: WalOp,
}
impl Partitioner {
/// Create a new time based partitioner using the time format
pub fn new_time_partitioner(time_format: impl Into<String>) -> Self {
Self {
time_format: time_format.into(),
}
}
/// Create a new time based partitioner that partitions by day
pub fn new_per_day_partitioner() -> Self {
Self::new_time_partitioner(YEAR_MONTH_DAY_TIME_FORMAT)
}
/// Given a parsed line and a default time, generate the string partition key
pub fn partition_key_for_line(&self, line: &ParsedLine<'_>, default_time: i64) -> String {
let timestamp = line.timestamp.unwrap_or(default_time);
format!(
"{}",
Utc.timestamp_nanos(timestamp).format(&self.time_format)
)
}
#[derive(Debug, Default)]
pub(crate) struct TableBatchMap<'a> {
pub(crate) lines: Vec<&'a str>,
pub(crate) table_batches: HashMap<String, TableBatch>,
}
#[cfg(test)]
@ -723,19 +827,21 @@ mod tests {
use crate::wal::WalImpl;
use crate::{SequenceNumber, WalOpBatch};
use arrow_util::assert_batches_eq;
use iox_time::{MockProvider, Time};
use object_store::memory::InMemory;
use object_store::ObjectStore;
#[test]
fn parse_lp_into_buffer() {
let db = Arc::new(DatabaseSchema::new("foo"));
let partitioner = Partitioner::new_per_day_partitioner();
let db_name = NamespaceName::new("foo").unwrap();
let lp = "cpu,region=west user=23.2 100\nfoo f1=1i";
let result = parse_validate_and_update_schema(
lp,
&db,
&partitioner,
0,
db_name,
Time::from_timestamp_nanos(0),
SegmentDuration::new_5m(),
false,
Precision::Nanosecond,
)
@ -754,15 +860,22 @@ mod tests {
let wal = WalImpl::new(dir.clone()).unwrap();
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let write_buffer = WriteBufferImpl::new(Arc::clone(&persister), Some(Arc::new(wal)))
.await
.unwrap();
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let segment_duration = SegmentDuration::new_5m();
let write_buffer = WriteBufferImpl::new(
Arc::clone(&persister),
Some(Arc::new(wal)),
Arc::clone(&time_provider),
segment_duration,
)
.await
.unwrap();
let summary = write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu bar=1 10",
123,
Time::from_timestamp_nanos(123),
false,
Precision::Nanosecond,
)
@ -771,9 +884,6 @@ mod tests {
assert_eq!(summary.line_count, 1);
assert_eq!(summary.field_count, 1);
assert_eq!(summary.tag_count, 0);
assert_eq!(summary.total_buffer_memory_used, 1);
assert_eq!(summary.segment_id, SegmentId::new(1));
assert_eq!(summary.sequence_number, SequenceNumber::new(1));
// ensure the data is in the buffer
let actual = write_buffer.get_table_record_batches("foo", "cpu");
@ -801,9 +911,14 @@ mod tests {
assert_eq!(batch, expected_batch);
// ensure we load state from the persister
let write_buffer = WriteBufferImpl::new(persister, Some(Arc::new(wal)))
.await
.unwrap();
let write_buffer = WriteBufferImpl::new(
persister,
Some(Arc::new(wal)),
time_provider,
segment_duration,
)
.await
.unwrap();
let actual = write_buffer.get_table_record_batches("foo", "cpu");
assert_batches_eq!(&expected, &actual);
}