feat: Store precision in WAL for replayability (#24966)

Up to this point we assumed that a precision for everything was in nanoseconds.
While we do write and persist data as nanoseconds we made this assumption for
the WAL. However, we store the original line protocol data. If we want it to be
replayable we would need to include the precision and use that when loading the
WAL from disk. This commit changes the code to do that and we can see that that
data is definitely peristed as the WAL is now bigger in the tests.
pull/24991/head
Michael Gattozzi 2024-05-08 13:05:24 -04:00 committed by GitHub
parent 9354c22f2c
commit 7a2867b98b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 111 additions and 6 deletions

View File

@ -447,6 +447,7 @@ pub struct LpWriteOp {
pub db_name: String,
pub lp: String,
pub default_time: i64,
pub precision: Precision,
}
/// A single write request can have many lines in it. A writer can request to accept all lines that are valid, while
@ -533,8 +534,8 @@ impl ParquetFile {
}
}
/// The summary data for a persisted parquet file in a segment.
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
/// The precision of the timestamp
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum Precision {
Auto,

View File

@ -660,7 +660,9 @@ fn segment_id_from_file_name(name: &str) -> Result<SegmentId> {
#[cfg(test)]
mod tests {
use super::*;
use crate::Catalog;
use crate::LpWriteOp;
use crate::Precision;
#[test]
fn segment_writer_reader() {
@ -673,6 +675,7 @@ mod tests {
db_name: "foo".to_string(),
lp: "cpu host=a val=10i 10".to_string(),
default_time: 1,
precision: Precision::Nanosecond,
});
writer.write_batch(vec![wal_op.clone()]).unwrap();
@ -690,6 +693,7 @@ mod tests {
db_name: "foo".to_string(),
lp: "cpu host=a val=10i 10".to_string(),
default_time: 1,
precision: Precision::Nanosecond,
});
// open the file, write and close it
@ -729,6 +733,7 @@ mod tests {
db_name: "foo".to_string(),
lp: "cpu host=a val=10i 10".to_string(),
default_time: 1,
precision: Precision::Nanosecond,
});
let wal = WalImpl::new(dir.clone()).unwrap();
@ -752,4 +757,93 @@ mod tests {
assert_eq!(batch.ops, vec![wal_op.clone()]);
assert_eq!(batch.sequence_number, SequenceNumber::new(1));
}
#[test]
fn wal_written_and_read_with_different_precisions() {
let dir = test_helpers::tmp_dir().unwrap().into_path();
let wal = WalImpl::new(dir.clone()).unwrap();
let wal_ops = vec![
WalOp::LpWrite(LpWriteOp {
db_name: "foo".to_string(),
lp: "cpu,host=a val=1i 1".to_string(),
default_time: 1,
precision: Precision::Second,
}),
WalOp::LpWrite(LpWriteOp {
db_name: "foo".to_string(),
lp: "cpu,host=b val=2i 1000".to_string(),
default_time: 1,
precision: Precision::Millisecond,
}),
WalOp::LpWrite(LpWriteOp {
db_name: "foo".to_string(),
lp: "cpu,host=c val=3i 1000000".to_string(),
default_time: 1,
precision: Precision::Microsecond,
}),
WalOp::LpWrite(LpWriteOp {
db_name: "foo".to_string(),
lp: "cpu,host=d val=4i 1000000000".to_string(),
default_time: 1,
precision: Precision::Nanosecond,
}),
WalOp::LpWrite(LpWriteOp {
db_name: "foo".to_string(),
lp: "cpu,host=e val=5i 1".to_string(),
default_time: 1,
precision: Precision::Auto,
}),
];
let segment = SegmentId::new(0);
// open the file, write and close it
{
let mut writer = wal
.new_segment_writer(segment, SegmentRange::test_range())
.unwrap();
writer.write_batch(wal_ops).unwrap();
// close the wal
drop(wal);
}
// Reopen the wal and make sure it loads the precision via
// `load_buffer_from_segment`
let catalog = Catalog::default();
let wal = WalImpl::new(dir).unwrap();
let schema = schema::SchemaBuilder::new()
.tag("host")
.influx_column(
"val",
schema::InfluxColumnType::Field(schema::InfluxFieldType::Integer),
)
.timestamp()
.build()
.unwrap();
// Load the data into a buffer.
let buffer = crate::write_buffer::buffer_segment::load_buffer_from_segment(
&catalog,
wal.open_segment_reader(segment).unwrap(),
)
.unwrap()
.0;
// Get the buffer data as record batches
let batch = buffer
.table_record_batches("foo", "cpu", schema.as_arrow(), &[])
.unwrap()
.unwrap();
let mut writer = arrow::json::LineDelimitedWriter::new(Vec::new());
writer.write_batches(&[&batch]).unwrap();
writer.finish().unwrap();
pretty_assertions::assert_eq!(
"{\"host\":\"a\",\"time\":\"1970-01-01T00:00:01\",\"val\":1}\n\
{\"host\":\"b\",\"time\":\"1970-01-01T00:00:01\",\"val\":2}\n\
{\"host\":\"c\",\"time\":\"1970-01-01T00:00:01\",\"val\":3}\n\
{\"host\":\"d\",\"time\":\"1970-01-01T00:00:01\",\"val\":4}\n\
{\"host\":\"e\",\"time\":\"1970-01-01T00:00:01\",\"val\":5}\n",
String::from_utf8(writer.into_inner()).unwrap()
)
}
}

View File

@ -12,8 +12,8 @@ use crate::write_buffer::{
};
use crate::{
wal, write_buffer, write_buffer::Result, DatabaseTables, ParquetFile, PersistedSegment,
Persister, Precision, SegmentDuration, SegmentId, SegmentRange, SequenceNumber,
TableParquetFiles, WalOp, WalSegmentReader, WalSegmentWriter,
Persister, SegmentDuration, SegmentId, SegmentRange, SequenceNumber, TableParquetFiles, WalOp,
WalSegmentReader, WalSegmentWriter,
};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
@ -196,7 +196,7 @@ pub(crate) fn load_buffer_from_segment(
Time::from_timestamp_nanos(write.default_time),
segment_duration,
false,
Precision::Nanosecond,
write.precision,
)?;
let db_name = &write.db_name;
@ -736,6 +736,7 @@ pub(crate) mod tests {
db_name: "db1".to_string(),
lp: lp.to_string(),
default_time: 0,
precision: crate::Precision::Nanosecond,
});
let write_batch = lp_to_write_batch(&catalog, "db1", lp);

View File

@ -145,6 +145,7 @@ mod tests {
use crate::persister::PersisterImpl;
use crate::test_helpers::lp_to_write_batch;
use crate::wal::{WalImpl, WalSegmentWriterNoopImpl};
use crate::Precision;
use crate::{
DatabaseTables, LpWriteOp, ParquetFile, SegmentRange, SequenceNumber, TableParquetFiles,
WalOp,
@ -180,6 +181,7 @@ mod tests {
db_name: "db1".to_string(),
lp: lp.to_string(),
default_time: 0,
precision: Precision::Nanosecond,
});
let write_batch = lp_to_write_batch(&catalog, "db1", lp);
@ -267,6 +269,7 @@ mod tests {
db_name: db_name.to_string(),
lp: lp.to_string(),
default_time: 0,
precision: Precision::Nanosecond,
});
let write_batch = lp_to_write_batch(&catalog, db_name, lp);
@ -352,6 +355,7 @@ mod tests {
db_name: db_name.to_string(),
lp: lp.to_string(),
default_time: 0,
precision: Precision::Nanosecond,
});
let write_batch = lp_to_write_batch(&catalog, db_name, lp);
@ -379,6 +383,7 @@ mod tests {
db_name: db_name.to_string(),
lp: lp.to_string(),
default_time: 0,
precision: Precision::Nanosecond,
});
let write_batch = lp_to_write_batch(&catalog, db_name, lp);
@ -417,7 +422,7 @@ mod tests {
loaded_state.persisted_segments[0],
PersistedSegment {
segment_id,
segment_wal_size_bytes: 227,
segment_wal_size_bytes: 252,
segment_parquet_size_bytes: 3458,
segment_row_count: 3,
segment_min_time: 10,
@ -526,6 +531,7 @@ mod tests {
db_name: db_name.to_string(),
lp: lp.to_string(),
default_time: 0,
precision: Precision::Nanosecond,
});
let write_batch = lp_to_write_batch(&catalog, db_name, lp);
@ -546,6 +552,7 @@ mod tests {
db_name: db_name.to_string(),
lp: lp.to_string(),
default_time: 0,
precision: Precision::Nanosecond,
});
let write_batch = lp_to_write_batch(&catalog, db_name, lp);

View File

@ -577,6 +577,7 @@ pub(crate) fn validate_or_insert_schema_and_partitions(
db_name: db_name.to_string(),
lp: table_batches.lines.join("\n"),
default_time: ingest_time.timestamp_nanos(),
precision,
}),
starting_catalog_sequence_number,
})
@ -928,6 +929,7 @@ mod tests {
db_name: "foo".to_string(),
lp: "cpu bar=1 10".to_string(),
default_time: 123,
precision: Precision::Nanosecond,
})],
};
assert_eq!(batch, expected_batch);