diff --git a/Cargo.lock b/Cargo.lock index 48cc819e4a..ed0c317cb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2219,6 +2219,7 @@ dependencies = [ "bytes", "criterion", "flate2", + "hashbrown", "influxdb_line_protocol", "mutable_batch", "schema", diff --git a/mutable_batch_lp/Cargo.toml b/mutable_batch_lp/Cargo.toml index d67f38159d..bc36496c3c 100644 --- a/mutable_batch_lp/Cargo.toml +++ b/mutable_batch_lp/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" description = "Conversion logic for line protocol -> MutableBatch" [dependencies] +hashbrown = "0.11" influxdb_line_protocol = { path = "../influxdb_line_protocol" } mutable_batch = { path = "../mutable_batch" } schema = { path = "../schema" } diff --git a/mutable_batch_lp/benches/write_lp.rs b/mutable_batch_lp/benches/write_lp.rs index 5bd7d5d1a3..0a52daf739 100644 --- a/mutable_batch_lp/benches/write_lp.rs +++ b/mutable_batch_lp/benches/write_lp.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; use flate2::read::GzDecoder; -use mutable_batch_lp::lines_to_batch; +use mutable_batch_lp::lines_to_batches; fn generate_lp_bytes() -> Bytes { let raw = include_bytes!("../../tests/fixtures/lineproto/read_filter.lp.gz"); @@ -23,7 +23,9 @@ pub fn write_lp(c: &mut Criterion) { group.bench_function(BenchmarkId::from_parameter(count), |b| { b.iter(|| { for _ in 0..*count { - lines_to_batch(std::str::from_utf8(&lp_bytes).unwrap(), 0).unwrap(); + let batches = + lines_to_batches(std::str::from_utf8(&lp_bytes).unwrap(), 0).unwrap(); + assert_eq!(batches.len(), 1); } }); }); diff --git a/mutable_batch_lp/src/lib.rs b/mutable_batch_lp/src/lib.rs index 0a5f7267a4..2f2446e808 100644 --- a/mutable_batch_lp/src/lib.rs +++ b/mutable_batch_lp/src/lib.rs @@ -11,6 +11,7 @@ clippy::clone_on_ref_ptr )] +use hashbrown::HashMap; use influxdb_line_protocol::{parse_lines, FieldValue, ParsedLine}; use mutable_batch::writer::Writer; use mutable_batch::MutableBatch; @@ -36,18 +37,26 @@ pub enum Error { /// Result type for line protocol conversion pub type Result<T, E = Error> = std::result::Result<T, E>; -/// Converts the provided lines of line protocol to a [`MutableBatch`] -pub fn lines_to_batch(lines: &str, default_time: i64) -> Result<MutableBatch> { - let mut batch = MutableBatch::new(); +/// Converts the provided lines of line protocol to a set of [`MutableBatch`] +/// keyed by measurement name +pub fn lines_to_batches(lines: &str, default_time: i64) -> Result<HashMap<String, MutableBatch>> { + let mut batches = HashMap::new(); for (line_idx, maybe_line) in parse_lines(lines).enumerate() { let line = maybe_line.context(LineProtocol { line: line_idx + 1 })?; + let measurement = line.series.measurement.as_str(); + + let (_, batch) = batches + .raw_entry_mut() + .from_key(measurement) + .or_insert_with(|| (measurement.to_string(), MutableBatch::new())); // TODO: Reuse writer - let mut writer = Writer::new(&mut batch, 1); + let mut writer = Writer::new(batch, 1); write_line(&mut writer, line, default_time).context(Write { line: line_idx + 1 })?; writer.commit(); } - Ok(batch) + + Ok(batches) } fn write_line( @@ -95,10 +104,14 @@ mod tests { fn test_basic() { let lp = r#"cpu,tag1=v1,tag2=v2 val=2i 0 cpu,tag1=v4,tag2=v1 val=2i 0 + mem,tag1=v2 ival=3i 0 cpu,tag2=v2 val=3i 1 - cpu,tag1=v1,tag2=v2 fval=2.0"#; + cpu,tag1=v1,tag2=v2 fval=2.0 + mem,tag1=v5 ival=2i 1 + "#; - let batch = lines_to_batch(lp, 5).unwrap(); + let batch = lines_to_batches(lp, 5).unwrap(); + assert_eq!(batch.len(), 2); assert_batches_eq!( &[ @@ -111,7 +124,19 @@ mod tests { "| 2 | v1 | v2 | 1970-01-01T00:00:00.000000005Z | |", "+------+------+------+--------------------------------+-----+", ], - &[batch.to_arrow(Selection::All).unwrap()] + &[batch["cpu"].to_arrow(Selection::All).unwrap()] + ); + + assert_batches_eq!( + &[ + "+------+------+--------------------------------+", + "| ival | tag1 | time |", + "+------+------+--------------------------------+", + "| 3 | v2 | 1970-01-01T00:00:00Z |", + "| 2 | v5 | 1970-01-01T00:00:00.000000001Z |", + "+------+------+--------------------------------+", + ], + &[batch["mem"].to_arrow(Selection::All).unwrap()] ); } }