diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs index 7d4a4fe0d9..1ad7780acc 100644 --- a/data_types/src/database_rules.rs +++ b/data_types/src/database_rules.rs @@ -1,5 +1,5 @@ use crate::{consistent_hasher::ConsistentHasher, server_id::ServerId, DatabaseName}; -use chrono::{DateTime, TimeZone, Utc}; +use chrono::{TimeZone, Utc}; use influxdb_line_protocol::ParsedLine; use regex::Regex; use snafu::{OptionExt, Snafu}; @@ -57,11 +57,7 @@ pub struct DatabaseRules { } impl DatabaseRules { - pub fn partition_key( - &self, - line: &ParsedLine<'_>, - default_time: &DateTime, - ) -> Result { + pub fn partition_key(&self, line: &ParsedLine<'_>, default_time: i64) -> Result { self.partition_template.partition_key(line, default_time) } @@ -82,16 +78,12 @@ impl DatabaseRules { /// Generates a partition key based on the line and the default time. pub trait Partitioner { - fn partition_key( - &self, - _line: &ParsedLine<'_>, - _default_time: &DateTime, - ) -> Result; + fn partition_key(&self, _line: &ParsedLine<'_>, _default_time: i64) -> Result; } impl Partitioner for DatabaseRules { - fn partition_key(&self, line: &ParsedLine<'_>, default_time: &DateTime) -> Result { - self.partition_key(&line, &default_time) + fn partition_key(&self, line: &ParsedLine<'_>, default_time: i64) -> Result { + self.partition_key(&line, default_time) } } @@ -291,7 +283,7 @@ pub struct PartitionTemplate { } impl Partitioner for PartitionTemplate { - fn partition_key(&self, line: &ParsedLine<'_>, default_time: &DateTime) -> Result { + fn partition_key(&self, line: &ParsedLine<'_>, default_time: i64) -> Result { let parts: Vec<_> = self .parts .iter() @@ -304,10 +296,10 @@ impl Partitioner for PartitionTemplate { None => "".to_string(), }, }, - TemplatePart::TimeFormat(format) => match line.timestamp { - Some(t) => Utc.timestamp_nanos(t).format(&format).to_string(), - None => default_time.format(&format).to_string(), - }, + TemplatePart::TimeFormat(format) => { + let nanos = line.timestamp.unwrap_or(default_time); + Utc.timestamp_nanos(nanos).format(&format).to_string() + } _ => unimplemented!(), }) .collect(); @@ -519,6 +511,8 @@ mod tests { use super::*; use influxdb_line_protocol::parse_lines; + const ARBITRARY_DEFAULT_TIME: i64 = 456; + #[test] fn partition_key_with_table() { let template = PartitionTemplate { @@ -526,7 +520,12 @@ mod tests { }; let line = parse_line("cpu foo=1 10"); - assert_eq!("cpu", template.partition_key(&line, &Utc::now()).unwrap()); + assert_eq!( + "cpu", + template + .partition_key(&line, ARBITRARY_DEFAULT_TIME) + .unwrap() + ); } #[test] @@ -536,7 +535,12 @@ mod tests { }; let line = parse_line("cpu foo=1 10"); - assert_eq!("foo_1", template.partition_key(&line, &Utc::now()).unwrap()); + assert_eq!( + "foo_1", + template + .partition_key(&line, ARBITRARY_DEFAULT_TIME) + .unwrap() + ); } #[test] @@ -548,7 +552,9 @@ mod tests { let line = parse_line("cpu foo=1.1 10"); assert_eq!( "foo_1.1", - template.partition_key(&line, &Utc::now()).unwrap() + template + .partition_key(&line, ARBITRARY_DEFAULT_TIME) + .unwrap() ); } @@ -561,7 +567,9 @@ mod tests { let line = parse_line("cpu foo=\"asdf\" 10"); assert_eq!( "foo_asdf", - template.partition_key(&line, &Utc::now()).unwrap() + template + .partition_key(&line, ARBITRARY_DEFAULT_TIME) + .unwrap() ); } @@ -574,7 +582,9 @@ mod tests { let line = parse_line("cpu bar=true 10"); assert_eq!( "bar_true", - template.partition_key(&line, &Utc::now()).unwrap() + template + .partition_key(&line, ARBITRARY_DEFAULT_TIME) + .unwrap() ); } @@ -587,7 +597,9 @@ mod tests { let line = parse_line("cpu,region=west usage_user=23.2 10"); assert_eq!( "region_west", - template.partition_key(&line, &Utc::now()).unwrap() + template + .partition_key(&line, ARBITRARY_DEFAULT_TIME) + .unwrap() ); } @@ -598,7 +610,12 @@ mod tests { }; let line = parse_line("cpu,foo=asdf bar=true 10"); - assert_eq!("", template.partition_key(&line, &Utc::now()).unwrap()); + assert_eq!( + "", + template + .partition_key(&line, ARBITRARY_DEFAULT_TIME) + .unwrap() + ); } #[test] @@ -610,7 +627,9 @@ mod tests { let line = parse_line("cpu,foo=asdf bar=true 1602338097000000000"); assert_eq!( "2020-10-10 13:54:57", - template.partition_key(&line, &Utc::now()).unwrap() + template + .partition_key(&line, ARBITRARY_DEFAULT_TIME) + .unwrap() ); } @@ -625,7 +644,9 @@ mod tests { let line = parse_line("cpu,foo=asdf bar=true"); assert_eq!( default_time.format(format_string).to_string(), - template.partition_key(&line, &default_time).unwrap() + template + .partition_key(&line, default_time.timestamp_nanos()) + .unwrap() ); } @@ -645,7 +666,9 @@ mod tests { ); assert_eq!( "cpu-region_west-usage_system_53.1-2020-10-10 13:54:57", - template.partition_key(&line, &Utc::now()).unwrap() + template + .partition_key(&line, ARBITRARY_DEFAULT_TIME) + .unwrap() ); } diff --git a/entry/benches/benchmark.rs b/entry/benches/benchmark.rs index 74bde99eb4..6e6d7083eb 100644 --- a/entry/benches/benchmark.rs +++ b/entry/benches/benchmark.rs @@ -15,7 +15,9 @@ fn sequenced_entry(c: &mut Criterion) { .collect::, _>>() .unwrap(); let shard_config: Option<&ShardConfig> = None; - let sharded_entries = lines_to_sharded_entries(&lines, shard_config, &partitioner(1)).unwrap(); + let default_time = 456; + let sharded_entries = + lines_to_sharded_entries(&lines, default_time, shard_config, &partitioner(1)).unwrap(); let entry = &sharded_entries.first().unwrap().entry; let data = entry.data(); assert_eq!( diff --git a/entry/src/entry.rs b/entry/src/entry.rs index c226111e64..d241a93b71 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -9,7 +9,7 @@ use std::{ num::NonZeroU64, }; -use chrono::{DateTime, Utc}; +use chrono::Utc; use flatbuffers::{FlatBufferBuilder, Follow, ForwardsUOffset, Vector, VectorIter, WIPOffset}; use ouroboros::self_referencing; use snafu::{OptionExt, ResultExt, Snafu}; @@ -65,10 +65,10 @@ type ColumnResult = std::result::Result; /// underlying flatbuffers bytes generated. pub fn lines_to_sharded_entries( lines: &[ParsedLine<'_>], + default_time: i64, sharder: Option<&impl Sharder>, partitioner: &impl Partitioner, ) -> Result> { - let default_time = Utc::now(); let mut sharded_lines = BTreeMap::new(); for line in lines { @@ -77,7 +77,7 @@ pub fn lines_to_sharded_entries( None => None, }; let partition_key = partitioner - .partition_key(line, &default_time) + .partition_key(line, default_time) .context(GeneratingPartitionKey)?; let table = line.series.measurement.as_str(); @@ -91,11 +91,9 @@ pub fn lines_to_sharded_entries( .push(line); } - let default_time = Utc::now(); - let sharded_entries = sharded_lines .into_iter() - .map(|(shard_id, partitions)| build_sharded_entry(shard_id, partitions, &default_time)) + .map(|(shard_id, partitions)| build_sharded_entry(shard_id, partitions, default_time)) .collect::>>()?; Ok(sharded_entries) @@ -104,7 +102,7 @@ pub fn lines_to_sharded_entries( fn build_sharded_entry( shard_id: Option, partitions: BTreeMap>>>, - default_time: &DateTime, + default_time: i64, ) -> Result { let mut fbb = flatbuffers::FlatBufferBuilder::new_with_capacity(1024); @@ -143,7 +141,7 @@ fn build_partition_write<'a>( fbb: &mut FlatBufferBuilder<'a>, partition_key: String, tables: BTreeMap<&str, Vec<&'a ParsedLine<'_>>>, - default_time: &DateTime, + default_time: i64, ) -> Result>> { let partition_key = fbb.create_string(&partition_key); @@ -166,7 +164,7 @@ fn build_table_write_batch<'a>( fbb: &mut FlatBufferBuilder<'a>, table_name: &str, lines: Vec<&'a ParsedLine<'_>>, - default_time: &DateTime, + default_time: i64, ) -> Result>> { let mut columns = BTreeMap::new(); for (i, line) in lines.iter().enumerate() { @@ -257,10 +255,7 @@ fn build_table_write_batch<'a>( .entry(TIME_COLUMN_NAME) .or_insert_with(ColumnBuilder::new_time_column); builder - .push_time( - line.timestamp - .unwrap_or_else(|| default_time.timestamp_nanos()), - ) + .push_time(line.timestamp.unwrap_or(default_time)) .context(TableColumnTypeMismatch { table: table_name, column: TIME_COLUMN_NAME, @@ -1530,11 +1525,18 @@ pub mod test_helpers { pub fn lp_to_entry(lp: &str) -> Entry { let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); - lines_to_sharded_entries(&lines, sharder(1).as_ref(), &hour_partitioner()) - .unwrap() - .pop() - .unwrap() - .entry + let default_time = Utc::now().timestamp_nanos(); + + lines_to_sharded_entries( + &lines, + default_time, + sharder(1).as_ref(), + &hour_partitioner(), + ) + .unwrap() + .pop() + .unwrap() + .entry } /// Converts the line protocol to a collection of `Entry` with a single @@ -1543,14 +1545,21 @@ pub mod test_helpers { pub fn lp_to_entries(lp: &str) -> Vec { let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); + let default_time = Utc::now().timestamp_nanos(); + lines .chunks(LP_BATCH_SIZE) .map(|batch| { - lines_to_sharded_entries(batch, sharder(1).as_ref(), &hour_partitioner()) - .unwrap() - .pop() - .unwrap() - .entry + lines_to_sharded_entries( + batch, + default_time, + sharder(1).as_ref(), + &hour_partitioner(), + ) + .unwrap() + .pop() + .unwrap() + .entry }) .collect::>() } @@ -1620,7 +1629,7 @@ pub mod test_helpers { fn partition_key( &self, _line: &ParsedLine<'_>, - _default_time: &DateTime, + _default_time: i64, ) -> data_types::database_rules::Result { let n = *self.n.borrow(); self.n.replace(n + 1); @@ -1636,15 +1645,14 @@ pub mod test_helpers { fn partition_key( &self, line: &ParsedLine<'_>, - default_time: &DateTime, + default_time: i64, ) -> data_types::database_rules::Result { const HOUR_FORMAT: &str = "%Y-%m-%dT%H"; - let key = match line.timestamp { - Some(t) => Utc.timestamp_nanos(t).format(HOUR_FORMAT), - None => default_time.format(HOUR_FORMAT), - } - .to_string(); + let key = Utc + .timestamp_nanos(line.timestamp.unwrap_or(default_time)) + .format(HOUR_FORMAT) + .to_string(); Ok(key) } @@ -1659,6 +1667,8 @@ mod tests { use super::test_helpers::*; use super::*; + const ARBITRARY_DEFAULT_TIME: i64 = 456; + #[test] fn shards_lines() { let lp = vec![ @@ -1669,8 +1679,13 @@ mod tests { .join("\n"); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); - let sharded_entries = - lines_to_sharded_entries(&lines, sharder(2).as_ref(), &partitioner(1)).unwrap(); + let sharded_entries = lines_to_sharded_entries( + &lines, + ARBITRARY_DEFAULT_TIME, + sharder(2).as_ref(), + &partitioner(1), + ) + .unwrap(); assert_eq!(sharded_entries.len(), 2); assert_eq!(sharded_entries[0].shard_id, Some(0)); @@ -1687,8 +1702,13 @@ mod tests { .join("\n"); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); - let sharded_entries = - lines_to_sharded_entries(&lines, NO_SHARD_CONFIG, &partitioner(1)).unwrap(); + let sharded_entries = lines_to_sharded_entries( + &lines, + ARBITRARY_DEFAULT_TIME, + NO_SHARD_CONFIG, + &partitioner(1), + ) + .unwrap(); assert_eq!(sharded_entries.len(), 1); assert_eq!(sharded_entries[0].shard_id, None); @@ -1704,8 +1724,13 @@ mod tests { .join("\n"); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); - let sharded_entries = - lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(2)).unwrap(); + let sharded_entries = lines_to_sharded_entries( + &lines, + ARBITRARY_DEFAULT_TIME, + sharder(1).as_ref(), + &partitioner(2), + ) + .unwrap(); let partition_writes = sharded_entries[0].entry.partition_writes().unwrap(); assert_eq!(partition_writes.len(), 2); @@ -1725,8 +1750,13 @@ mod tests { .join("\n"); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); - let sharded_entries = - lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); + let sharded_entries = lines_to_sharded_entries( + &lines, + ARBITRARY_DEFAULT_TIME, + sharder(1).as_ref(), + &partitioner(1), + ) + .unwrap(); let partition_writes = sharded_entries[0].entry.partition_writes().unwrap(); let table_batches = partition_writes[0].table_batches(); @@ -1742,8 +1772,13 @@ mod tests { let lp = vec!["a,host=a val=23 983", "a,host=a,region=west val2=23.2 2343"].join("\n"); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); - let sharded_entries = - lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); + let sharded_entries = lines_to_sharded_entries( + &lines, + ARBITRARY_DEFAULT_TIME, + sharder(1).as_ref(), + &partitioner(1), + ) + .unwrap(); let partition_writes = sharded_entries[0].entry.partition_writes().unwrap(); let table_batches = partition_writes[0].table_batches(); @@ -1784,8 +1819,13 @@ mod tests { .join("\n"); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); - let sharded_entries = - lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); + let sharded_entries = lines_to_sharded_entries( + &lines, + ARBITRARY_DEFAULT_TIME, + sharder(1).as_ref(), + &partitioner(1), + ) + .unwrap(); let partition_writes = sharded_entries .first() @@ -1855,8 +1895,13 @@ mod tests { .join("\n"); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); - let sharded_entries = - lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); + let sharded_entries = lines_to_sharded_entries( + &lines, + ARBITRARY_DEFAULT_TIME, + sharder(1).as_ref(), + &partitioner(1), + ) + .unwrap(); let partition_writes = sharded_entries .first() @@ -1980,8 +2025,13 @@ mod tests { let lp = vec!["a val=1i 1"].join("\n"); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); - let sharded_entries = - lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); + let sharded_entries = lines_to_sharded_entries( + &lines, + ARBITRARY_DEFAULT_TIME, + sharder(1).as_ref(), + &partitioner(1), + ) + .unwrap(); let partition_writes = sharded_entries .first() .unwrap() @@ -2011,8 +2061,13 @@ mod tests { .join("\n"); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); - let sharded_entries = - lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); + let sharded_entries = lines_to_sharded_entries( + &lines, + ARBITRARY_DEFAULT_TIME, + sharder(1).as_ref(), + &partitioner(1), + ) + .unwrap(); let partition_writes = sharded_entries .first() .unwrap() @@ -2055,8 +2110,13 @@ mod tests { .join("\n"); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); - let sharded_entries = - lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); + let sharded_entries = lines_to_sharded_entries( + &lines, + ARBITRARY_DEFAULT_TIME, + sharder(1).as_ref(), + &partitioner(1), + ) + .unwrap(); let partition_writes = sharded_entries .first() .unwrap() @@ -2092,10 +2152,11 @@ mod tests { let lp = vec!["a val=1i", "a val=2i 123"].join("\n"); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); - let t = Utc::now().timestamp_nanos(); + let default_time = Utc::now().timestamp_nanos(); let sharded_entries = - lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); + lines_to_sharded_entries(&lines, default_time, sharder(1).as_ref(), &partitioner(1)) + .unwrap(); let partition_writes = sharded_entries .first() @@ -2110,17 +2171,74 @@ mod tests { let col = columns.get(0).unwrap(); assert_eq!(col.name(), TIME_COLUMN_NAME); let values = col.values().i64_values().unwrap(); - assert!(values[0].unwrap() > t); + assert_eq!(values[0].unwrap(), default_time); assert_eq!(values[1], Some(123)); } + #[test] + fn missing_times_added_should_match_partition() { + use chrono::TimeZone; + + let default_time = Utc + .datetime_from_str("2021-01-01 00:59:59 999999999", "%F %T %f") + .unwrap() + .timestamp_nanos(); + + // One point that has no timestamp + let lp = "a val=1i"; + let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); + + // Partition on the hour + let hour_partitioner = hour_partitioner(); + + // Extract the partition key the partitioned write was assigned + let sharded_entries = + lines_to_sharded_entries(&lines, default_time, sharder(1).as_ref(), &hour_partitioner) + .unwrap(); + let partition_writes = sharded_entries + .first() + .unwrap() + .entry + .partition_writes() + .unwrap(); + let partition_write = partition_writes.first().unwrap(); + let assigned_partition_key = partition_write.key(); + + // Extract the timestamp the point was assigned + let table_batches = partition_write.table_batches(); + let batch = table_batches.first().unwrap(); + let columns = batch.columns(); + let col = columns.get(0).unwrap(); + assert_eq!(col.name(), TIME_COLUMN_NAME); + let values = col.values().i64_values().unwrap(); + + // Recreate the `ParsedLine` with the assigned timestamp to re-partition this point + let lp_with_assigned_timestamp = format!("{} {}", lp, values[0].unwrap()); + let reparsed_lines: Vec<_> = parse_lines(&lp_with_assigned_timestamp) + .map(|l| l.unwrap()) + .collect(); + let point_key = hour_partitioner + .partition_key( + &reparsed_lines[0], + ARBITRARY_DEFAULT_TIME, // shouldn't be used since line now has timestamp + ) + .unwrap(); + + // The point and the partitioned write it's in should have the same partition key + assert_eq!(point_key, assigned_partition_key); + } + #[test] fn field_type_conflict() { let lp = vec!["a val=1i 1", "a val=2.1 123"].join("\n"); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); - let sharded_entries = - lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)); + let sharded_entries = lines_to_sharded_entries( + &lines, + ARBITRARY_DEFAULT_TIME, + sharder(1).as_ref(), + &partitioner(1), + ); assert!(sharded_entries.is_err()); } @@ -2130,8 +2248,12 @@ mod tests { let lp = vec!["a,host=a val=1i 1", "a host=\"b\" 123"].join("\n"); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); - let sharded_entries = - lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)); + let sharded_entries = lines_to_sharded_entries( + &lines, + ARBITRARY_DEFAULT_TIME, + sharder(1).as_ref(), + &partitioner(1), + ); assert!(sharded_entries.is_err()); } @@ -2146,8 +2268,13 @@ mod tests { .join("\n"); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); - let sharded_entries = - lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); + let sharded_entries = lines_to_sharded_entries( + &lines, + ARBITRARY_DEFAULT_TIME, + sharder(1).as_ref(), + &partitioner(1), + ) + .unwrap(); let entry_bytes = sharded_entries.first().unwrap().entry.data(); let clock_value = ClockValue::try_from(23).unwrap(); @@ -2231,8 +2358,13 @@ mod tests { .join("\n"); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); - let sharded_entries = - lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); + let sharded_entries = lines_to_sharded_entries( + &lines, + ARBITRARY_DEFAULT_TIME, + sharder(1).as_ref(), + &partitioner(1), + ) + .unwrap(); let entry_bytes = sharded_entries.first().unwrap().entry.data(); @@ -2271,8 +2403,13 @@ mod tests { .join("\n"); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); - let sharded_entries = - lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); + let sharded_entries = lines_to_sharded_entries( + &lines, + ARBITRARY_DEFAULT_TIME, + sharder(1).as_ref(), + &partitioner(1), + ) + .unwrap(); let entry_bytes = sharded_entries.first().unwrap().entry.data(); diff --git a/server/src/lib.rs b/server/src/lib.rs index 728de1d89b..9aa31df2d5 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -570,7 +570,15 @@ impl Server { /// of ShardedEntry which are then sent to other IOx servers based on /// the ShardConfig or sent to the local database for buffering in the /// WriteBuffer and/or the MutableBuffer if configured. - pub async fn write_lines(&self, db_name: &str, lines: &[ParsedLine<'_>]) -> Result<()> { + /// + /// The provided `default_time` is nanoseconds since the epoch and will be assigned + /// to any lines that don't have a timestamp. + pub async fn write_lines( + &self, + db_name: &str, + lines: &[ParsedLine<'_>], + default_time: i64, + ) -> Result<()> { self.require_id()?; let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?; @@ -589,8 +597,9 @@ impl Server { let rules = db.rules.read(); let shard_config = &rules.shard_config; - let sharded_entries = lines_to_sharded_entries(lines, shard_config.as_ref(), &*rules) - .context(LineConversion)?; + let sharded_entries = + lines_to_sharded_entries(lines, default_time, shard_config.as_ref(), &*rules) + .context(LineConversion)?; let shards = shard_config .as_ref() @@ -1062,6 +1071,8 @@ mod tests { use std::sync::atomic::{AtomicBool, Ordering}; use tempfile::TempDir; + const ARBITRARY_DEFAULT_TIME: i64 = 456; + // TODO: perhaps switch to a builder pattern. fn config_with_metric_registry_and_store( object_store: ObjectStore, @@ -1108,7 +1119,10 @@ mod tests { assert!(matches!(resp, Error::IdNotSet)); let lines = parsed_lines("cpu foo=1 10"); - let resp = server.write_lines("foo", &lines).await.unwrap_err(); + let resp = server + .write_lines("foo", &lines, ARBITRARY_DEFAULT_TIME) + .await + .unwrap_err(); assert!(matches!(resp, Error::IdNotSet)); } @@ -1317,7 +1331,10 @@ mod tests { let line = "cpu bar=1 10"; let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect(); - server.write_lines("foo", &lines).await.unwrap(); + server + .write_lines("foo", &lines, ARBITRARY_DEFAULT_TIME) + .await + .unwrap(); let db_name = DatabaseName::new("foo").unwrap(); let db = server.db(&db_name).unwrap(); @@ -1357,8 +1374,13 @@ mod tests { let line = "cpu bar=1 10"; let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect(); - let sharded_entries = lines_to_sharded_entries(&lines, NO_SHARD_CONFIG, &*db.rules.read()) - .expect("sharded entries"); + let sharded_entries = lines_to_sharded_entries( + &lines, + ARBITRARY_DEFAULT_TIME, + NO_SHARD_CONFIG, + &*db.rules.read(), + ) + .expect("sharded entries"); let entry = &sharded_entries[0].entry; server @@ -1457,14 +1479,20 @@ mod tests { let line = "cpu bar=1 10"; let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect(); - let err = server.write_lines(&db_name, &lines).await.unwrap_err(); + let err = server + .write_lines(&db_name, &lines, ARBITRARY_DEFAULT_TIME) + .await + .unwrap_err(); assert!( matches!(err, Error::NoRemoteConfigured { node_group } if node_group == remote_ids) ); // one remote is configured but it's down and we'll get connection error server.update_remote(bad_remote_id, BAD_REMOTE_ADDR.into()); - let err = server.write_lines(&db_name, &lines).await.unwrap_err(); + let err = server + .write_lines(&db_name, &lines, ARBITRARY_DEFAULT_TIME) + .await + .unwrap_err(); assert!(matches!( err, Error::NoRemoteReachable { errors } if matches!( @@ -1484,7 +1512,7 @@ mod tests { // probability both the remotes will get hit. for _ in 0..100 { server - .write_lines(&db_name, &lines) + .write_lines(&db_name, &lines, ARBITRARY_DEFAULT_TIME) .await .expect("cannot write lines"); } @@ -1514,7 +1542,10 @@ mod tests { let line = "cpu bar=1 10"; let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect(); - server.write_lines(&db_name, &lines).await.unwrap(); + server + .write_lines(&db_name, &lines, ARBITRARY_DEFAULT_TIME) + .await + .unwrap(); // start the close (note this is not an async) let partition_key = ""; @@ -1678,9 +1709,13 @@ mod tests { // inserting first line does not trigger hard buffer limit let line_1 = "cpu bar=1 10"; let lines_1: Vec<_> = parse_lines(line_1).map(|l| l.unwrap()).collect(); - let sharded_entries_1 = - lines_to_sharded_entries(&lines_1, NO_SHARD_CONFIG, &*db.rules.read()) - .expect("first sharded entries"); + let sharded_entries_1 = lines_to_sharded_entries( + &lines_1, + ARBITRARY_DEFAULT_TIME, + NO_SHARD_CONFIG, + &*db.rules.read(), + ) + .expect("first sharded entries"); let entry_1 = &sharded_entries_1[0].entry; server @@ -1691,9 +1726,13 @@ mod tests { // inserting second line will let line_2 = "cpu bar=2 20"; let lines_2: Vec<_> = parse_lines(line_2).map(|l| l.unwrap()).collect(); - let sharded_entries_2 = - lines_to_sharded_entries(&lines_2, NO_SHARD_CONFIG, &*db.rules.read()) - .expect("second sharded entries"); + let sharded_entries_2 = lines_to_sharded_entries( + &lines_2, + ARBITRARY_DEFAULT_TIME, + NO_SHARD_CONFIG, + &*db.rules.read(), + ) + .expect("second sharded entries"); let entry_2 = &sharded_entries_2[0].entry; let res = server.write_entry("foo", entry_2.data().into()).await; assert!(matches!(res, Err(super::Error::HardLimitReached {}))); diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index a2c9c2589c..f3bfe9ad82 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -23,6 +23,7 @@ use server::{ConnectionManager, Server as AppServer}; // External crates use bytes::{Bytes, BytesMut}; +use chrono::Utc; use futures::{self, StreamExt}; use http::header::{CONTENT_ENCODING, CONTENT_TYPE}; use hyper::{http::HeaderValue, Body, Method, Request, Response, StatusCode}; @@ -480,6 +481,10 @@ where let body = str::from_utf8(&body).context(ReadingBodyAsUtf8)?; + // The time, in nanoseconds since the epoch, to assign to any points that don't + // contain a timestamp + let default_time = Utc::now().timestamp_nanos(); + let mut num_fields = 0; let mut num_lines = 0; @@ -501,43 +506,46 @@ where KeyValue::new("path", path), ]; - server.write_lines(&db_name, &lines).await.map_err(|e| { - let labels = &[ - metrics::KeyValue::new("status", "error"), - metrics::KeyValue::new("db_name", db_name.to_string()), - ]; - - server - .metrics - .ingest_lines_total - .add_with_labels(num_lines as u64, labels); - - server - .metrics - .ingest_fields_total - .add_with_labels(num_fields as u64, labels); - - server.metrics.ingest_points_bytes_total.add_with_labels( - body.len() as u64, - &[ + server + .write_lines(&db_name, &lines, default_time) + .await + .map_err(|e| { + let labels = &[ metrics::KeyValue::new("status", "error"), metrics::KeyValue::new("db_name", db_name.to_string()), - ], - ); - debug!(?e, ?db_name, ?num_lines, "error writing lines"); + ]; - obs.client_error_with_labels(&metric_kv); // user error - match e { - server::Error::DatabaseNotFound { .. } => ApplicationError::DatabaseNotFound { - name: db_name.to_string(), - }, - _ => ApplicationError::WritingPoints { - org: write_info.org.clone(), - bucket_name: write_info.bucket.clone(), - source: Box::new(e), - }, - } - })?; + server + .metrics + .ingest_lines_total + .add_with_labels(num_lines as u64, labels); + + server + .metrics + .ingest_fields_total + .add_with_labels(num_fields as u64, labels); + + server.metrics.ingest_points_bytes_total.add_with_labels( + body.len() as u64, + &[ + metrics::KeyValue::new("status", "error"), + metrics::KeyValue::new("db_name", db_name.to_string()), + ], + ); + debug!(?e, ?db_name, ?num_lines, "error writing lines"); + + obs.client_error_with_labels(&metric_kv); // user error + match e { + server::Error::DatabaseNotFound { .. } => ApplicationError::DatabaseNotFound { + name: db_name.to_string(), + }, + _ => ApplicationError::WritingPoints { + org: write_info.org.clone(), + bucket_name: write_info.bucket.clone(), + source: Box::new(e), + }, + } + })?; let labels = &[ metrics::KeyValue::new("status", "ok"), diff --git a/src/influxdb_ioxd/rpc/write.rs b/src/influxdb_ioxd/rpc/write.rs index a67e492dc5..7d2ee1a831 100644 --- a/src/influxdb_ioxd/rpc/write.rs +++ b/src/influxdb_ioxd/rpc/write.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use chrono::Utc; use generated_types::{google::FieldViolation, influxdata::iox::write::v1::*}; use influxdb_line_protocol::parse_lines; use observability_deps::tracing::debug; @@ -25,6 +26,10 @@ where ) -> Result, tonic::Status> { let request = request.into_inner(); + // The time, in nanoseconds since the epoch, to assign to any points that don't + // contain a timestamp + let default_time = Utc::now().timestamp_nanos(); + let db_name = request.db_name; let lp_data = request.lp_data; let lp_chars = lp_data.len(); @@ -40,7 +45,7 @@ where debug!(%db_name, %lp_chars, lp_line_count, "Writing lines into database"); self.server - .write_lines(&db_name, &lines) + .write_lines(&db_name, &lines, default_time) .await .map_err(default_server_error_handler)?; diff --git a/tests/end_to_end_cases/write_api.rs b/tests/end_to_end_cases/write_api.rs index 064f37db51..403efb82be 100644 --- a/tests/end_to_end_cases/write_api.rs +++ b/tests/end_to_end_cases/write_api.rs @@ -93,8 +93,10 @@ async fn test_write_entry() { let lp_data = vec!["cpu bar=1 10", "cpu bar=2 20"].join("\n"); let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect(); + let default_time = 456; let sharded_entries = - lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); + lines_to_sharded_entries(&lines, default_time, sharder(1).as_ref(), &partitioner(1)) + .unwrap(); let entry: Vec = sharded_entries.into_iter().next().unwrap().entry.into();