feat: support multiple tables in lp -> MutableBatch conversion (#2959)

pull/24376/head
Raphael Taylor-Davies 2021-10-25 12:42:29 +01:00 committed by GitHub
parent a6f4078ba6
commit 820e0d56bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 39 additions and 10 deletions

1
Cargo.lock generated
View File

@ -2219,6 +2219,7 @@ dependencies = [
"bytes",
"criterion",
"flate2",
"hashbrown",
"influxdb_line_protocol",
"mutable_batch",
"schema",

View File

@ -5,6 +5,7 @@ edition = "2021"
description = "Conversion logic for line protocol -> MutableBatch"
[dependencies]
hashbrown = "0.11"
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
mutable_batch = { path = "../mutable_batch" }
schema = { path = "../schema" }

View File

@ -4,7 +4,7 @@ use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use flate2::read::GzDecoder;
use mutable_batch_lp::lines_to_batch;
use mutable_batch_lp::lines_to_batches;
fn generate_lp_bytes() -> Bytes {
let raw = include_bytes!("../../tests/fixtures/lineproto/read_filter.lp.gz");
@ -23,7 +23,9 @@ pub fn write_lp(c: &mut Criterion) {
group.bench_function(BenchmarkId::from_parameter(count), |b| {
b.iter(|| {
for _ in 0..*count {
lines_to_batch(std::str::from_utf8(&lp_bytes).unwrap(), 0).unwrap();
let batches =
lines_to_batches(std::str::from_utf8(&lp_bytes).unwrap(), 0).unwrap();
assert_eq!(batches.len(), 1);
}
});
});

View File

@ -11,6 +11,7 @@
clippy::clone_on_ref_ptr
)]
use hashbrown::HashMap;
use influxdb_line_protocol::{parse_lines, FieldValue, ParsedLine};
use mutable_batch::writer::Writer;
use mutable_batch::MutableBatch;
@ -36,18 +37,26 @@ pub enum Error {
/// Result type for line protocol conversion
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Converts the provided lines of line protocol to a [`MutableBatch`]
pub fn lines_to_batch(lines: &str, default_time: i64) -> Result<MutableBatch> {
let mut batch = MutableBatch::new();
/// Converts the provided lines of line protocol to a set of [`MutableBatch`]
/// keyed by measurement name
pub fn lines_to_batches(lines: &str, default_time: i64) -> Result<HashMap<String, MutableBatch>> {
let mut batches = HashMap::new();
for (line_idx, maybe_line) in parse_lines(lines).enumerate() {
let line = maybe_line.context(LineProtocol { line: line_idx + 1 })?;
let measurement = line.series.measurement.as_str();
let (_, batch) = batches
.raw_entry_mut()
.from_key(measurement)
.or_insert_with(|| (measurement.to_string(), MutableBatch::new()));
// TODO: Reuse writer
let mut writer = Writer::new(&mut batch, 1);
let mut writer = Writer::new(batch, 1);
write_line(&mut writer, line, default_time).context(Write { line: line_idx + 1 })?;
writer.commit();
}
Ok(batch)
Ok(batches)
}
fn write_line(
@ -95,10 +104,14 @@ mod tests {
fn test_basic() {
let lp = r#"cpu,tag1=v1,tag2=v2 val=2i 0
cpu,tag1=v4,tag2=v1 val=2i 0
mem,tag1=v2 ival=3i 0
cpu,tag2=v2 val=3i 1
cpu,tag1=v1,tag2=v2 fval=2.0"#;
cpu,tag1=v1,tag2=v2 fval=2.0
mem,tag1=v5 ival=2i 1
"#;
let batch = lines_to_batch(lp, 5).unwrap();
let batch = lines_to_batches(lp, 5).unwrap();
assert_eq!(batch.len(), 2);
assert_batches_eq!(
&[
@ -111,7 +124,19 @@ mod tests {
"| 2 | v1 | v2 | 1970-01-01T00:00:00.000000005Z | |",
"+------+------+------+--------------------------------+-----+",
],
&[batch.to_arrow(Selection::All).unwrap()]
&[batch["cpu"].to_arrow(Selection::All).unwrap()]
);
assert_batches_eq!(
&[
"+------+------+--------------------------------+",
"| ival | tag1 | time |",
"+------+------+--------------------------------+",
"| 3 | v2 | 1970-01-01T00:00:00Z |",
"| 2 | v5 | 1970-01-01T00:00:00.000000001Z |",
"+------+------+--------------------------------+",
],
&[batch["mem"].to_arrow(Selection::All).unwrap()]
);
}
}