refactor: Rename level 0 to gen1 to match compaction wording (#25317)
parent
dd8f324728
commit
f8b6cfac5b
|
@ -14,7 +14,7 @@ use influxdb3_server::{
|
|||
auth::AllOrNothingAuthorizer, builder::ServerBuilder, query_executor::QueryExecutorImpl, serve,
|
||||
CommonServerState,
|
||||
};
|
||||
use influxdb3_wal::{Level0Duration, WalConfig};
|
||||
use influxdb3_wal::{Gen1Duration, WalConfig};
|
||||
use influxdb3_write::{persister::Persister, write_buffer::WriteBufferImpl, WriteBuffer};
|
||||
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
|
||||
use iox_time::SystemProvider;
|
||||
|
@ -143,14 +143,16 @@ pub struct Config {
|
|||
pub bearer_token: Option<String>,
|
||||
|
||||
/// Duration that the Parquet files get arranged into. The data timestamps will land each
|
||||
/// row into a file of this duration. 1m, 5m, and 10m are supported.
|
||||
/// row into a file of this duration. 1m, 5m, and 10m are supported. These are known as
|
||||
/// "generation 1" files. The compactor in Pro can compact these into larger and longer
|
||||
/// generations.
|
||||
#[clap(
|
||||
long = "level-0-duration",
|
||||
env = "INFLUXDB3_LEVEL_0_DURATION",
|
||||
long = "gen1-duration",
|
||||
env = "INFLUXDB3_GEN1_DURATION",
|
||||
default_value = "10m",
|
||||
action
|
||||
)]
|
||||
pub level_0_duration: Level0Duration,
|
||||
pub gen1_duration: Gen1Duration,
|
||||
|
||||
/// Interval to flush buffered data to a wal file. Writes that wait for wal confirmation will
|
||||
/// take as long as this interval to complete.
|
||||
|
@ -303,7 +305,7 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
config.host_identifier_prefix,
|
||||
));
|
||||
let wal_config = WalConfig {
|
||||
level_0_duration: config.level_0_duration,
|
||||
gen1_duration: config.gen1_duration,
|
||||
max_write_buffer_size: config.wal_max_write_buffer_size,
|
||||
flush_interval: config.wal_flush_interval.into(),
|
||||
snapshot_size: config.wal_snapshot_size,
|
||||
|
|
|
@ -589,7 +589,7 @@ mod tests {
|
|||
use datafusion::{assert_batches_sorted_eq, error::DataFusionError};
|
||||
use futures::TryStreamExt;
|
||||
use influxdb3_catalog::catalog::Catalog;
|
||||
use influxdb3_wal::{Level0Duration, WalConfig};
|
||||
use influxdb3_wal::{Gen1Duration, WalConfig};
|
||||
use influxdb3_write::{
|
||||
last_cache::LastCacheProvider, persister::Persister, write_buffer::WriteBufferImpl,
|
||||
WriteBuffer,
|
||||
|
@ -641,7 +641,7 @@ mod tests {
|
|||
Arc::<MockProvider>::clone(&time_provider),
|
||||
Arc::clone(&executor),
|
||||
WalConfig {
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_millis(10),
|
||||
snapshot_size: 1,
|
||||
|
|
|
@ -42,8 +42,8 @@ pub enum Error {
|
|||
#[error("wal is shutdown and not accepting writes")]
|
||||
Shutdown,
|
||||
|
||||
#[error("invalid level 0 duration {0}. Must be one of 1m, 5m, 10m")]
|
||||
InvalidLevel0Duration(String),
|
||||
#[error("invalid gen1 duration {0}. Must be one of 1m, 5m, 10m")]
|
||||
InvalidGen1Duration(String),
|
||||
|
||||
#[error("last cache size must be from 1 to 10")]
|
||||
InvalidLastCacheSize,
|
||||
|
@ -115,7 +115,7 @@ pub trait WalFileNotifier: Debug + Send + Sync + 'static {
|
|||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct WalConfig {
|
||||
/// The duration of time of chunks to be persisted as Parquet files
|
||||
pub level_0_duration: Level0Duration,
|
||||
pub gen1_duration: Gen1Duration,
|
||||
/// The maximum number of writes that can be buffered before we must flush to a wal file
|
||||
pub max_write_buffer_size: usize,
|
||||
/// The interval at which to flush the buffer to a wal file
|
||||
|
@ -127,7 +127,7 @@ pub struct WalConfig {
|
|||
impl WalConfig {
|
||||
pub fn test_config() -> Self {
|
||||
Self {
|
||||
level_0_duration: Level0Duration::new_5m(),
|
||||
gen1_duration: Gen1Duration::new_5m(),
|
||||
max_write_buffer_size: 1000,
|
||||
flush_interval: Duration::from_millis(10),
|
||||
snapshot_size: 100,
|
||||
|
@ -138,7 +138,7 @@ impl WalConfig {
|
|||
impl Default for WalConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
level_0_duration: Default::default(),
|
||||
gen1_duration: Default::default(),
|
||||
max_write_buffer_size: 100_000,
|
||||
flush_interval: Duration::from_secs(1),
|
||||
snapshot_size: 600,
|
||||
|
@ -148,9 +148,9 @@ impl Default for WalConfig {
|
|||
|
||||
/// The duration of data timestamps, grouped into files persisted into object storage.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct Level0Duration(Duration);
|
||||
pub struct Gen1Duration(Duration);
|
||||
|
||||
impl Level0Duration {
|
||||
impl Gen1Duration {
|
||||
pub fn duration_seconds(&self) -> i64 {
|
||||
self.0.as_secs() as i64
|
||||
}
|
||||
|
@ -160,7 +160,7 @@ impl Level0Duration {
|
|||
t.get() - (t.get() % self.0.as_nanos() as i64)
|
||||
}
|
||||
|
||||
/// Given a time, returns the start time of the level 0 chunk that contains the time.
|
||||
/// Given a time, returns the start time of the gen1 chunk that contains the time.
|
||||
pub fn start_time(&self, timestamp_seconds: i64) -> Time {
|
||||
let duration_seconds = self.duration_seconds();
|
||||
let rounded_seconds = (timestamp_seconds / duration_seconds) * duration_seconds;
|
||||
|
@ -184,7 +184,7 @@ impl Level0Duration {
|
|||
}
|
||||
}
|
||||
|
||||
impl FromStr for Level0Duration {
|
||||
impl FromStr for Gen1Duration {
|
||||
type Err = Error;
|
||||
|
||||
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
|
||||
|
@ -192,12 +192,12 @@ impl FromStr for Level0Duration {
|
|||
"1m" => Ok(Self(Duration::from_secs(60))),
|
||||
"5m" => Ok(Self(Duration::from_secs(300))),
|
||||
"10m" => Ok(Self(Duration::from_secs(600))),
|
||||
_ => Err(Error::InvalidLevel0Duration(s.to_string())),
|
||||
_ => Err(Error::InvalidGen1Duration(s.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Level0Duration {
|
||||
impl Default for Gen1Duration {
|
||||
fn default() -> Self {
|
||||
Self(Duration::from_secs(600))
|
||||
}
|
||||
|
|
|
@ -78,7 +78,7 @@ impl WalObjectStore {
|
|||
},
|
||||
SnapshotTracker::new(
|
||||
config.snapshot_size,
|
||||
config.level_0_duration,
|
||||
config.gen1_duration,
|
||||
last_snapshot_sequence_number,
|
||||
),
|
||||
)),
|
||||
|
@ -609,7 +609,7 @@ impl<'a> TryFrom<&'a Path> for WalFileSequenceNumber {
|
|||
mod tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
Field, FieldData, Level0Duration, Row, SnapshotSequenceNumber, TableChunk, TableChunks,
|
||||
Field, FieldData, Gen1Duration, Row, SnapshotSequenceNumber, TableChunk, TableChunks,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use object_store::memory::InMemory;
|
||||
|
@ -624,7 +624,7 @@ mod tests {
|
|||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_secs(1),
|
||||
snapshot_size: 2,
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
};
|
||||
let wal = WalObjectStore::new_without_replay(
|
||||
Arc::clone(&object_store),
|
||||
|
@ -833,7 +833,7 @@ mod tests {
|
|||
"my_host",
|
||||
Arc::clone(&replay_notifier),
|
||||
WalConfig {
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 10,
|
||||
flush_interval: Duration::from_millis(10),
|
||||
snapshot_size: 2,
|
||||
|
@ -1006,7 +1006,7 @@ mod tests {
|
|||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_secs(1),
|
||||
snapshot_size: 2,
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
};
|
||||
let wal = WalObjectStore::new_without_replay(
|
||||
Arc::clone(&object_store),
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
//! configured as it can be used to ensure that data in the write buffer is persisted in blocks
|
||||
//! that are not too large and unlikely to overlap.
|
||||
|
||||
use crate::{Level0Duration, SnapshotDetails, SnapshotSequenceNumber, WalFileSequenceNumber};
|
||||
use crate::{Gen1Duration, SnapshotDetails, SnapshotSequenceNumber, WalFileSequenceNumber};
|
||||
use data_types::Timestamp;
|
||||
|
||||
/// A struct that tracks the WAL periods (files if using object store) and decides when to snapshot the WAL.
|
||||
|
@ -14,16 +14,16 @@ pub(crate) struct SnapshotTracker {
|
|||
last_wal_sequence_number: WalFileSequenceNumber,
|
||||
wal_periods: Vec<WalPeriod>,
|
||||
snapshot_size: usize,
|
||||
level_0_duration: Level0Duration,
|
||||
gen1_duration: Gen1Duration,
|
||||
}
|
||||
|
||||
impl SnapshotTracker {
|
||||
/// Create a new `SnapshotTracker` with the given snapshot size and level 0 duration. The
|
||||
/// level 0 duration is the size of chunks in the write buffer that will be persisted as
|
||||
/// Create a new `SnapshotTracker` with the given snapshot size and gen1 duration. The
|
||||
/// gen1 duration is the size of chunks in the write buffer that will be persisted as
|
||||
/// parquet files.
|
||||
pub(crate) fn new(
|
||||
snapshot_size: usize,
|
||||
level_0_duration: Level0Duration,
|
||||
gen1_duration: Gen1Duration,
|
||||
last_snapshot_sequence_number: Option<SnapshotSequenceNumber>,
|
||||
) -> Self {
|
||||
Self {
|
||||
|
@ -31,7 +31,7 @@ impl SnapshotTracker {
|
|||
last_wal_sequence_number: WalFileSequenceNumber::default(),
|
||||
wal_periods: Vec::new(),
|
||||
snapshot_size,
|
||||
level_0_duration,
|
||||
gen1_duration,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -74,8 +74,8 @@ impl SnapshotTracker {
|
|||
.map(|period| period.max_time)
|
||||
.max()
|
||||
.unwrap();
|
||||
let t = max_time - (max_time.get() % self.level_0_duration.as_nanos())
|
||||
+ self.level_0_duration.as_nanos();
|
||||
let t = max_time - (max_time.get() % self.gen1_duration.as_nanos())
|
||||
+ self.gen1_duration.as_nanos();
|
||||
let last_wal_sequence_number = wal_periods.last().unwrap().wal_file_number;
|
||||
|
||||
let snapshot_details = SnapshotDetails {
|
||||
|
@ -91,8 +91,8 @@ impl SnapshotTracker {
|
|||
}
|
||||
|
||||
let t = self.wal_periods.last().unwrap().max_time;
|
||||
// round the last timestamp down to the level_0_duration
|
||||
let t = t - (t.get() % self.level_0_duration.as_nanos());
|
||||
// round the last timestamp down to the gen1_duration
|
||||
let t = t - (t.get() % self.gen1_duration.as_nanos());
|
||||
|
||||
// any wal period that has data before this time can be snapshot
|
||||
let periods_to_snapshot = self
|
||||
|
@ -119,7 +119,7 @@ impl SnapshotTracker {
|
|||
}
|
||||
|
||||
/// The number of wal periods we need to see before we attempt a snapshot. This is to ensure that we
|
||||
/// don't snapshot before we've buffered up enough data to fill a level 0 chunk.
|
||||
/// don't snapshot before we've buffered up enough data to fill a gen1 chunk.
|
||||
fn number_of_periods_to_snapshot_after(&self) -> usize {
|
||||
self.snapshot_size + self.snapshot_size / 2
|
||||
}
|
||||
|
@ -177,7 +177,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn snapshot() {
|
||||
let mut tracker = SnapshotTracker::new(2, Level0Duration::new_1m(), None);
|
||||
let mut tracker = SnapshotTracker::new(2, Gen1Duration::new_1m(), None);
|
||||
let p1 = WalPeriod::new(
|
||||
WalFileSequenceNumber::new(1),
|
||||
Timestamp::new(0),
|
||||
|
@ -261,7 +261,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn snapshot_future_data_forces_snapshot() {
|
||||
let mut tracker = SnapshotTracker::new(2, Level0Duration::new_1m(), None);
|
||||
let mut tracker = SnapshotTracker::new(2, Gen1Duration::new_1m(), None);
|
||||
let p1 = WalPeriod::new(
|
||||
WalFileSequenceNumber::new(1),
|
||||
Timestamp::new(0),
|
||||
|
|
|
@ -346,7 +346,7 @@ mod test_helpers {
|
|||
use crate::write_buffer::validator::WriteValidator;
|
||||
use crate::Precision;
|
||||
use data_types::NamespaceName;
|
||||
use influxdb3_wal::{Level0Duration, WriteBatch};
|
||||
use influxdb3_wal::{Gen1Duration, WriteBatch};
|
||||
use iox_time::Time;
|
||||
use std::sync::Arc;
|
||||
|
||||
|
@ -363,7 +363,7 @@ mod test_helpers {
|
|||
.unwrap()
|
||||
.convert_lines_to_buffer(
|
||||
Time::from_timestamp_nanos(0),
|
||||
Level0Duration::new_5m(),
|
||||
Gen1Duration::new_5m(),
|
||||
Precision::Nanosecond,
|
||||
);
|
||||
|
||||
|
|
|
@ -206,7 +206,7 @@ impl WriteBufferImpl {
|
|||
ingest_time.timestamp_nanos(),
|
||||
)?
|
||||
.v1_parse_lines_and_update_schema(lp, accept_partial)?
|
||||
.convert_lines_to_buffer(ingest_time, self.wal_config.level_0_duration, precision);
|
||||
.convert_lines_to_buffer(ingest_time, self.wal_config.gen1_duration, precision);
|
||||
|
||||
// if there were catalog updates, ensure they get persisted to the wal, so they're
|
||||
// replayed on restart
|
||||
|
@ -248,7 +248,7 @@ impl WriteBufferImpl {
|
|||
ingest_time.timestamp_nanos(),
|
||||
)?
|
||||
.v3_parse_lines_and_update_schema(lp, accept_partial)?
|
||||
.convert_lines_to_buffer(ingest_time, self.wal_config.level_0_duration, precision);
|
||||
.convert_lines_to_buffer(ingest_time, self.wal_config.gen1_duration, precision);
|
||||
|
||||
// if there were catalog updates, ensure they get persisted to the wal, so they're
|
||||
// replayed on restart
|
||||
|
@ -604,7 +604,7 @@ mod tests {
|
|||
use datafusion_util::config::register_iox_object_store;
|
||||
use futures_util::StreamExt;
|
||||
use influxdb3_catalog::catalog::SequenceNumber;
|
||||
use influxdb3_wal::{Level0Duration, SnapshotSequenceNumber, WalFileSequenceNumber};
|
||||
use influxdb3_wal::{Gen1Duration, SnapshotSequenceNumber, WalFileSequenceNumber};
|
||||
use iox_query::exec::IOxSessionContext;
|
||||
use iox_time::{MockProvider, Time};
|
||||
use object_store::local::LocalFileSystem;
|
||||
|
@ -623,7 +623,7 @@ mod tests {
|
|||
.unwrap()
|
||||
.convert_lines_to_buffer(
|
||||
Time::from_timestamp_nanos(0),
|
||||
Level0Duration::new_5m(),
|
||||
Gen1Duration::new_5m(),
|
||||
Precision::Nanosecond,
|
||||
);
|
||||
|
||||
|
@ -720,7 +720,7 @@ mod tests {
|
|||
Arc::clone(&time_provider),
|
||||
crate::test_help::make_exec(),
|
||||
WalConfig {
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_millis(50),
|
||||
snapshot_size: 100,
|
||||
|
@ -740,7 +740,7 @@ mod tests {
|
|||
Time::from_timestamp_nanos(0),
|
||||
Arc::new(InMemory::new()),
|
||||
WalConfig {
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_millis(10),
|
||||
snapshot_size: 1,
|
||||
|
@ -775,7 +775,7 @@ mod tests {
|
|||
Arc::clone(&wbuf.time_provider),
|
||||
Arc::clone(&wbuf.buffer.executor),
|
||||
WalConfig {
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_millis(10),
|
||||
snapshot_size: 1,
|
||||
|
@ -808,7 +808,7 @@ mod tests {
|
|||
Arc::clone(&wbuf.time_provider),
|
||||
Arc::clone(&wbuf.buffer.executor),
|
||||
WalConfig {
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_millis(10),
|
||||
snapshot_size: 1,
|
||||
|
@ -862,7 +862,7 @@ mod tests {
|
|||
Arc::clone(&wbuf.time_provider),
|
||||
Arc::clone(&wbuf.buffer.executor),
|
||||
WalConfig {
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_millis(10),
|
||||
snapshot_size: 1,
|
||||
|
@ -881,7 +881,7 @@ mod tests {
|
|||
Time::from_timestamp_nanos(0),
|
||||
Arc::new(InMemory::new()),
|
||||
WalConfig {
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_millis(10),
|
||||
snapshot_size: 2,
|
||||
|
@ -1014,7 +1014,7 @@ mod tests {
|
|||
Arc::clone(&write_buffer.time_provider),
|
||||
Arc::clone(&write_buffer.buffer.executor),
|
||||
WalConfig {
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_millis(10),
|
||||
snapshot_size: 2,
|
||||
|
@ -1081,7 +1081,7 @@ mod tests {
|
|||
Time::from_timestamp_nanos(0),
|
||||
Arc::new(InMemory::new()),
|
||||
WalConfig {
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_millis(5),
|
||||
snapshot_size: 1,
|
||||
|
@ -1195,7 +1195,7 @@ mod tests {
|
|||
Time::from_timestamp_nanos(0),
|
||||
Arc::clone(&object_store),
|
||||
WalConfig {
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_millis(5),
|
||||
snapshot_size: 1,
|
||||
|
@ -1284,7 +1284,7 @@ mod tests {
|
|||
Time::from_timestamp_nanos(0),
|
||||
Arc::clone(&obj_store),
|
||||
WalConfig {
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_millis(10),
|
||||
snapshot_size: 1,
|
||||
|
@ -1325,7 +1325,7 @@ mod tests {
|
|||
Time::from_timestamp_nanos(0),
|
||||
Arc::clone(&obj_store),
|
||||
WalConfig {
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_millis(10),
|
||||
snapshot_size: 1,
|
||||
|
@ -1356,7 +1356,7 @@ mod tests {
|
|||
Time::from_timestamp_nanos(0),
|
||||
Arc::clone(&obj_store),
|
||||
WalConfig {
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_millis(10),
|
||||
snapshot_size: 2,
|
||||
|
@ -1408,7 +1408,7 @@ mod tests {
|
|||
Time::from_timestamp_nanos(0),
|
||||
Arc::clone(&obj_store),
|
||||
WalConfig {
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_millis(10),
|
||||
snapshot_size: 2,
|
||||
|
@ -1442,7 +1442,7 @@ mod tests {
|
|||
Time::from_timestamp_nanos(0),
|
||||
Arc::clone(&obj_store),
|
||||
WalConfig {
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_millis(10),
|
||||
snapshot_size: 1,
|
||||
|
@ -1486,7 +1486,7 @@ mod tests {
|
|||
Time::from_timestamp_nanos(0),
|
||||
Arc::clone(&obj_store),
|
||||
WalConfig {
|
||||
level_0_duration: Level0Duration::new_1m(),
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_millis(10),
|
||||
snapshot_size: 1,
|
||||
|
|
|
@ -9,7 +9,7 @@ use influxdb3_catalog::catalog::{
|
|||
|
||||
use influxdb3_wal::{
|
||||
CatalogBatch, CatalogOp, Field, FieldAdditions, FieldData, FieldDataType, FieldDefinition,
|
||||
Level0Duration, Row, TableChunks, WriteBatch,
|
||||
Gen1Duration, Row, TableChunks, WriteBatch,
|
||||
};
|
||||
use influxdb_line_protocol::{parse_lines, v3, FieldValue, ParsedLine};
|
||||
use iox_time::Time;
|
||||
|
@ -545,12 +545,12 @@ impl<'lp> WriteValidator<LinesParsed<'lp, v3::ParsedLine<'lp>>> {
|
|||
/// be buffered and written to the WAL, if configured.
|
||||
///
|
||||
/// This involves splitting out the writes into different batches for each chunk, which will
|
||||
/// map to the `Level0Duration`. This function should be infallible, because
|
||||
/// map to the `Gen1Duration`. This function should be infallible, because
|
||||
/// the schema for incoming writes has been fully validated.
|
||||
pub(crate) fn convert_lines_to_buffer(
|
||||
self,
|
||||
ingest_time: Time,
|
||||
level_0_duration: Level0Duration,
|
||||
gen1_duration: Gen1Duration,
|
||||
precision: Precision,
|
||||
) -> ValidatedLines {
|
||||
let mut table_chunks = HashMap::new();
|
||||
|
@ -571,7 +571,7 @@ impl<'lp> WriteValidator<LinesParsed<'lp, v3::ParsedLine<'lp>>> {
|
|||
line,
|
||||
&mut table_chunks,
|
||||
ingest_time,
|
||||
level_0_duration,
|
||||
gen1_duration,
|
||||
precision,
|
||||
);
|
||||
}
|
||||
|
@ -594,7 +594,7 @@ fn convert_v3_parsed_line(
|
|||
line: v3::ParsedLine<'_>,
|
||||
table_chunk_map: &mut HashMap<Arc<str>, TableChunks>,
|
||||
ingest_time: Time,
|
||||
level_0_duration: Level0Duration,
|
||||
gen1_duration: Gen1Duration,
|
||||
precision: Precision,
|
||||
) {
|
||||
// Set up row values:
|
||||
|
@ -630,7 +630,7 @@ fn convert_v3_parsed_line(
|
|||
});
|
||||
|
||||
// Add the row into the correct chunk in the table
|
||||
let chunk_time = level_0_duration.chunk_time_for_timestamp(Timestamp::new(time_value_nanos));
|
||||
let chunk_time = gen1_duration.chunk_time_for_timestamp(Timestamp::new(time_value_nanos));
|
||||
let table_name: Arc<str> = line.series.measurement.to_string().into();
|
||||
let table_chunks = table_chunk_map.entry(Arc::clone(&table_name)).or_default();
|
||||
table_chunks.push_row(
|
||||
|
@ -647,12 +647,12 @@ impl<'lp> WriteValidator<LinesParsed<'lp, ParsedLine<'lp>>> {
|
|||
/// be buffered and written to the WAL, if configured.
|
||||
///
|
||||
/// This involves splitting out the writes into different batches for each chunk, which will
|
||||
/// map to the `Level0Duration`. This function should be infallible, because
|
||||
/// map to the `Gen1Duration`. This function should be infallible, because
|
||||
/// the schema for incoming writes has been fully validated.
|
||||
pub(crate) fn convert_lines_to_buffer(
|
||||
self,
|
||||
ingest_time: Time,
|
||||
level_0_duration: Level0Duration,
|
||||
gen1_duration: Gen1Duration,
|
||||
precision: Precision,
|
||||
) -> ValidatedLines {
|
||||
let mut table_chunks = HashMap::new();
|
||||
|
@ -668,7 +668,7 @@ impl<'lp> WriteValidator<LinesParsed<'lp, ParsedLine<'lp>>> {
|
|||
line,
|
||||
&mut table_chunks,
|
||||
ingest_time,
|
||||
level_0_duration,
|
||||
gen1_duration,
|
||||
precision,
|
||||
);
|
||||
}
|
||||
|
@ -691,7 +691,7 @@ fn convert_v1_parsed_line(
|
|||
line: ParsedLine<'_>,
|
||||
table_chunk_map: &mut HashMap<Arc<str>, TableChunks>,
|
||||
ingest_time: Time,
|
||||
level_0_duration: Level0Duration,
|
||||
gen1_duration: Gen1Duration,
|
||||
precision: Precision,
|
||||
) {
|
||||
// now that we've ensured all columns exist in the schema, construct the actual row and values
|
||||
|
@ -731,7 +731,7 @@ fn convert_v1_parsed_line(
|
|||
.map(|ts| apply_precision_to_timestamp(precision, ts))
|
||||
.unwrap_or(ingest_time.timestamp_nanos());
|
||||
|
||||
let chunk_time = level_0_duration.chunk_time_for_timestamp(Timestamp::new(time_value_nanos));
|
||||
let chunk_time = gen1_duration.chunk_time_for_timestamp(Timestamp::new(time_value_nanos));
|
||||
|
||||
values.push(Field {
|
||||
name: TIME_COLUMN_NAME.to_string().into(),
|
||||
|
@ -774,7 +774,7 @@ mod tests {
|
|||
|
||||
use crate::{catalog::Catalog, write_buffer::Error, Precision};
|
||||
use data_types::NamespaceName;
|
||||
use influxdb3_wal::Level0Duration;
|
||||
use influxdb3_wal::Gen1Duration;
|
||||
use iox_time::Time;
|
||||
|
||||
use super::WriteValidator;
|
||||
|
@ -787,7 +787,7 @@ mod tests {
|
|||
.v1_parse_lines_and_update_schema("cpu,tag1=foo val1=\"bar\" 1234", false)?
|
||||
.convert_lines_to_buffer(
|
||||
Time::from_timestamp_nanos(0),
|
||||
Level0Duration::new_5m(),
|
||||
Gen1Duration::new_5m(),
|
||||
Precision::Auto,
|
||||
);
|
||||
|
||||
|
|
Loading…
Reference in New Issue