fix: Set default line timestamp and default partition time to same value (#1512)

* refactor: Rearrange to allow injection of the current time in tests

* test: Failing test showing a point can be in the wrong partition

* fix: Only get the default time once per ShardedEntry creation, in router
pull/24376/head
Carol (Nichols || Goulding) 2021-05-24 10:55:11 -04:00 committed by GitHub
parent 27e5b8fabf
commit 5c5064bdac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 361 additions and 145 deletions

View File

@ -1,5 +1,5 @@
use crate::{consistent_hasher::ConsistentHasher, server_id::ServerId, DatabaseName}; use crate::{consistent_hasher::ConsistentHasher, server_id::ServerId, DatabaseName};
use chrono::{DateTime, TimeZone, Utc}; use chrono::{TimeZone, Utc};
use influxdb_line_protocol::ParsedLine; use influxdb_line_protocol::ParsedLine;
use regex::Regex; use regex::Regex;
use snafu::{OptionExt, Snafu}; use snafu::{OptionExt, Snafu};
@ -57,11 +57,7 @@ pub struct DatabaseRules {
} }
impl DatabaseRules { impl DatabaseRules {
pub fn partition_key( pub fn partition_key(&self, line: &ParsedLine<'_>, default_time: i64) -> Result<String> {
&self,
line: &ParsedLine<'_>,
default_time: &DateTime<Utc>,
) -> Result<String> {
self.partition_template.partition_key(line, default_time) self.partition_template.partition_key(line, default_time)
} }
@ -82,16 +78,12 @@ impl DatabaseRules {
/// Generates a partition key based on the line and the default time. /// Generates a partition key based on the line and the default time.
pub trait Partitioner { pub trait Partitioner {
fn partition_key( fn partition_key(&self, _line: &ParsedLine<'_>, _default_time: i64) -> Result<String>;
&self,
_line: &ParsedLine<'_>,
_default_time: &DateTime<Utc>,
) -> Result<String>;
} }
impl Partitioner for DatabaseRules { impl Partitioner for DatabaseRules {
fn partition_key(&self, line: &ParsedLine<'_>, default_time: &DateTime<Utc>) -> Result<String> { fn partition_key(&self, line: &ParsedLine<'_>, default_time: i64) -> Result<String> {
self.partition_key(&line, &default_time) self.partition_key(&line, default_time)
} }
} }
@ -291,7 +283,7 @@ pub struct PartitionTemplate {
} }
impl Partitioner for PartitionTemplate { impl Partitioner for PartitionTemplate {
fn partition_key(&self, line: &ParsedLine<'_>, default_time: &DateTime<Utc>) -> Result<String> { fn partition_key(&self, line: &ParsedLine<'_>, default_time: i64) -> Result<String> {
let parts: Vec<_> = self let parts: Vec<_> = self
.parts .parts
.iter() .iter()
@ -304,10 +296,10 @@ impl Partitioner for PartitionTemplate {
None => "".to_string(), None => "".to_string(),
}, },
}, },
TemplatePart::TimeFormat(format) => match line.timestamp { TemplatePart::TimeFormat(format) => {
Some(t) => Utc.timestamp_nanos(t).format(&format).to_string(), let nanos = line.timestamp.unwrap_or(default_time);
None => default_time.format(&format).to_string(), Utc.timestamp_nanos(nanos).format(&format).to_string()
}, }
_ => unimplemented!(), _ => unimplemented!(),
}) })
.collect(); .collect();
@ -519,6 +511,8 @@ mod tests {
use super::*; use super::*;
use influxdb_line_protocol::parse_lines; use influxdb_line_protocol::parse_lines;
const ARBITRARY_DEFAULT_TIME: i64 = 456;
#[test] #[test]
fn partition_key_with_table() { fn partition_key_with_table() {
let template = PartitionTemplate { let template = PartitionTemplate {
@ -526,7 +520,12 @@ mod tests {
}; };
let line = parse_line("cpu foo=1 10"); let line = parse_line("cpu foo=1 10");
assert_eq!("cpu", template.partition_key(&line, &Utc::now()).unwrap()); assert_eq!(
"cpu",
template
.partition_key(&line, ARBITRARY_DEFAULT_TIME)
.unwrap()
);
} }
#[test] #[test]
@ -536,7 +535,12 @@ mod tests {
}; };
let line = parse_line("cpu foo=1 10"); let line = parse_line("cpu foo=1 10");
assert_eq!("foo_1", template.partition_key(&line, &Utc::now()).unwrap()); assert_eq!(
"foo_1",
template
.partition_key(&line, ARBITRARY_DEFAULT_TIME)
.unwrap()
);
} }
#[test] #[test]
@ -548,7 +552,9 @@ mod tests {
let line = parse_line("cpu foo=1.1 10"); let line = parse_line("cpu foo=1.1 10");
assert_eq!( assert_eq!(
"foo_1.1", "foo_1.1",
template.partition_key(&line, &Utc::now()).unwrap() template
.partition_key(&line, ARBITRARY_DEFAULT_TIME)
.unwrap()
); );
} }
@ -561,7 +567,9 @@ mod tests {
let line = parse_line("cpu foo=\"asdf\" 10"); let line = parse_line("cpu foo=\"asdf\" 10");
assert_eq!( assert_eq!(
"foo_asdf", "foo_asdf",
template.partition_key(&line, &Utc::now()).unwrap() template
.partition_key(&line, ARBITRARY_DEFAULT_TIME)
.unwrap()
); );
} }
@ -574,7 +582,9 @@ mod tests {
let line = parse_line("cpu bar=true 10"); let line = parse_line("cpu bar=true 10");
assert_eq!( assert_eq!(
"bar_true", "bar_true",
template.partition_key(&line, &Utc::now()).unwrap() template
.partition_key(&line, ARBITRARY_DEFAULT_TIME)
.unwrap()
); );
} }
@ -587,7 +597,9 @@ mod tests {
let line = parse_line("cpu,region=west usage_user=23.2 10"); let line = parse_line("cpu,region=west usage_user=23.2 10");
assert_eq!( assert_eq!(
"region_west", "region_west",
template.partition_key(&line, &Utc::now()).unwrap() template
.partition_key(&line, ARBITRARY_DEFAULT_TIME)
.unwrap()
); );
} }
@ -598,7 +610,12 @@ mod tests {
}; };
let line = parse_line("cpu,foo=asdf bar=true 10"); let line = parse_line("cpu,foo=asdf bar=true 10");
assert_eq!("", template.partition_key(&line, &Utc::now()).unwrap()); assert_eq!(
"",
template
.partition_key(&line, ARBITRARY_DEFAULT_TIME)
.unwrap()
);
} }
#[test] #[test]
@ -610,7 +627,9 @@ mod tests {
let line = parse_line("cpu,foo=asdf bar=true 1602338097000000000"); let line = parse_line("cpu,foo=asdf bar=true 1602338097000000000");
assert_eq!( assert_eq!(
"2020-10-10 13:54:57", "2020-10-10 13:54:57",
template.partition_key(&line, &Utc::now()).unwrap() template
.partition_key(&line, ARBITRARY_DEFAULT_TIME)
.unwrap()
); );
} }
@ -625,7 +644,9 @@ mod tests {
let line = parse_line("cpu,foo=asdf bar=true"); let line = parse_line("cpu,foo=asdf bar=true");
assert_eq!( assert_eq!(
default_time.format(format_string).to_string(), default_time.format(format_string).to_string(),
template.partition_key(&line, &default_time).unwrap() template
.partition_key(&line, default_time.timestamp_nanos())
.unwrap()
); );
} }
@ -645,7 +666,9 @@ mod tests {
); );
assert_eq!( assert_eq!(
"cpu-region_west-usage_system_53.1-2020-10-10 13:54:57", "cpu-region_west-usage_system_53.1-2020-10-10 13:54:57",
template.partition_key(&line, &Utc::now()).unwrap() template
.partition_key(&line, ARBITRARY_DEFAULT_TIME)
.unwrap()
); );
} }

View File

@ -15,7 +15,9 @@ fn sequenced_entry(c: &mut Criterion) {
.collect::<Result<Vec<_>, _>>() .collect::<Result<Vec<_>, _>>()
.unwrap(); .unwrap();
let shard_config: Option<&ShardConfig> = None; let shard_config: Option<&ShardConfig> = None;
let sharded_entries = lines_to_sharded_entries(&lines, shard_config, &partitioner(1)).unwrap(); let default_time = 456;
let sharded_entries =
lines_to_sharded_entries(&lines, default_time, shard_config, &partitioner(1)).unwrap();
let entry = &sharded_entries.first().unwrap().entry; let entry = &sharded_entries.first().unwrap().entry;
let data = entry.data(); let data = entry.data();
assert_eq!( assert_eq!(

View File

@ -9,7 +9,7 @@ use std::{
num::NonZeroU64, num::NonZeroU64,
}; };
use chrono::{DateTime, Utc}; use chrono::Utc;
use flatbuffers::{FlatBufferBuilder, Follow, ForwardsUOffset, Vector, VectorIter, WIPOffset}; use flatbuffers::{FlatBufferBuilder, Follow, ForwardsUOffset, Vector, VectorIter, WIPOffset};
use ouroboros::self_referencing; use ouroboros::self_referencing;
use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{OptionExt, ResultExt, Snafu};
@ -65,10 +65,10 @@ type ColumnResult<T, E = ColumnError> = std::result::Result<T, E>;
/// underlying flatbuffers bytes generated. /// underlying flatbuffers bytes generated.
pub fn lines_to_sharded_entries( pub fn lines_to_sharded_entries(
lines: &[ParsedLine<'_>], lines: &[ParsedLine<'_>],
default_time: i64,
sharder: Option<&impl Sharder>, sharder: Option<&impl Sharder>,
partitioner: &impl Partitioner, partitioner: &impl Partitioner,
) -> Result<Vec<ShardedEntry>> { ) -> Result<Vec<ShardedEntry>> {
let default_time = Utc::now();
let mut sharded_lines = BTreeMap::new(); let mut sharded_lines = BTreeMap::new();
for line in lines { for line in lines {
@ -77,7 +77,7 @@ pub fn lines_to_sharded_entries(
None => None, None => None,
}; };
let partition_key = partitioner let partition_key = partitioner
.partition_key(line, &default_time) .partition_key(line, default_time)
.context(GeneratingPartitionKey)?; .context(GeneratingPartitionKey)?;
let table = line.series.measurement.as_str(); let table = line.series.measurement.as_str();
@ -91,11 +91,9 @@ pub fn lines_to_sharded_entries(
.push(line); .push(line);
} }
let default_time = Utc::now();
let sharded_entries = sharded_lines let sharded_entries = sharded_lines
.into_iter() .into_iter()
.map(|(shard_id, partitions)| build_sharded_entry(shard_id, partitions, &default_time)) .map(|(shard_id, partitions)| build_sharded_entry(shard_id, partitions, default_time))
.collect::<Result<Vec<_>>>()?; .collect::<Result<Vec<_>>>()?;
Ok(sharded_entries) Ok(sharded_entries)
@ -104,7 +102,7 @@ pub fn lines_to_sharded_entries(
fn build_sharded_entry( fn build_sharded_entry(
shard_id: Option<ShardId>, shard_id: Option<ShardId>,
partitions: BTreeMap<String, BTreeMap<&str, Vec<&ParsedLine<'_>>>>, partitions: BTreeMap<String, BTreeMap<&str, Vec<&ParsedLine<'_>>>>,
default_time: &DateTime<Utc>, default_time: i64,
) -> Result<ShardedEntry> { ) -> Result<ShardedEntry> {
let mut fbb = flatbuffers::FlatBufferBuilder::new_with_capacity(1024); let mut fbb = flatbuffers::FlatBufferBuilder::new_with_capacity(1024);
@ -143,7 +141,7 @@ fn build_partition_write<'a>(
fbb: &mut FlatBufferBuilder<'a>, fbb: &mut FlatBufferBuilder<'a>,
partition_key: String, partition_key: String,
tables: BTreeMap<&str, Vec<&'a ParsedLine<'_>>>, tables: BTreeMap<&str, Vec<&'a ParsedLine<'_>>>,
default_time: &DateTime<Utc>, default_time: i64,
) -> Result<flatbuffers::WIPOffset<entry_fb::PartitionWrite<'a>>> { ) -> Result<flatbuffers::WIPOffset<entry_fb::PartitionWrite<'a>>> {
let partition_key = fbb.create_string(&partition_key); let partition_key = fbb.create_string(&partition_key);
@ -166,7 +164,7 @@ fn build_table_write_batch<'a>(
fbb: &mut FlatBufferBuilder<'a>, fbb: &mut FlatBufferBuilder<'a>,
table_name: &str, table_name: &str,
lines: Vec<&'a ParsedLine<'_>>, lines: Vec<&'a ParsedLine<'_>>,
default_time: &DateTime<Utc>, default_time: i64,
) -> Result<flatbuffers::WIPOffset<entry_fb::TableWriteBatch<'a>>> { ) -> Result<flatbuffers::WIPOffset<entry_fb::TableWriteBatch<'a>>> {
let mut columns = BTreeMap::new(); let mut columns = BTreeMap::new();
for (i, line) in lines.iter().enumerate() { for (i, line) in lines.iter().enumerate() {
@ -257,10 +255,7 @@ fn build_table_write_batch<'a>(
.entry(TIME_COLUMN_NAME) .entry(TIME_COLUMN_NAME)
.or_insert_with(ColumnBuilder::new_time_column); .or_insert_with(ColumnBuilder::new_time_column);
builder builder
.push_time( .push_time(line.timestamp.unwrap_or(default_time))
line.timestamp
.unwrap_or_else(|| default_time.timestamp_nanos()),
)
.context(TableColumnTypeMismatch { .context(TableColumnTypeMismatch {
table: table_name, table: table_name,
column: TIME_COLUMN_NAME, column: TIME_COLUMN_NAME,
@ -1530,7 +1525,14 @@ pub mod test_helpers {
pub fn lp_to_entry(lp: &str) -> Entry { pub fn lp_to_entry(lp: &str) -> Entry {
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
lines_to_sharded_entries(&lines, sharder(1).as_ref(), &hour_partitioner()) let default_time = Utc::now().timestamp_nanos();
lines_to_sharded_entries(
&lines,
default_time,
sharder(1).as_ref(),
&hour_partitioner(),
)
.unwrap() .unwrap()
.pop() .pop()
.unwrap() .unwrap()
@ -1543,10 +1545,17 @@ pub mod test_helpers {
pub fn lp_to_entries(lp: &str) -> Vec<Entry> { pub fn lp_to_entries(lp: &str) -> Vec<Entry> {
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let default_time = Utc::now().timestamp_nanos();
lines lines
.chunks(LP_BATCH_SIZE) .chunks(LP_BATCH_SIZE)
.map(|batch| { .map(|batch| {
lines_to_sharded_entries(batch, sharder(1).as_ref(), &hour_partitioner()) lines_to_sharded_entries(
batch,
default_time,
sharder(1).as_ref(),
&hour_partitioner(),
)
.unwrap() .unwrap()
.pop() .pop()
.unwrap() .unwrap()
@ -1620,7 +1629,7 @@ pub mod test_helpers {
fn partition_key( fn partition_key(
&self, &self,
_line: &ParsedLine<'_>, _line: &ParsedLine<'_>,
_default_time: &DateTime<Utc>, _default_time: i64,
) -> data_types::database_rules::Result<String> { ) -> data_types::database_rules::Result<String> {
let n = *self.n.borrow(); let n = *self.n.borrow();
self.n.replace(n + 1); self.n.replace(n + 1);
@ -1636,14 +1645,13 @@ pub mod test_helpers {
fn partition_key( fn partition_key(
&self, &self,
line: &ParsedLine<'_>, line: &ParsedLine<'_>,
default_time: &DateTime<Utc>, default_time: i64,
) -> data_types::database_rules::Result<String> { ) -> data_types::database_rules::Result<String> {
const HOUR_FORMAT: &str = "%Y-%m-%dT%H"; const HOUR_FORMAT: &str = "%Y-%m-%dT%H";
let key = match line.timestamp { let key = Utc
Some(t) => Utc.timestamp_nanos(t).format(HOUR_FORMAT), .timestamp_nanos(line.timestamp.unwrap_or(default_time))
None => default_time.format(HOUR_FORMAT), .format(HOUR_FORMAT)
}
.to_string(); .to_string();
Ok(key) Ok(key)
@ -1659,6 +1667,8 @@ mod tests {
use super::test_helpers::*; use super::test_helpers::*;
use super::*; use super::*;
const ARBITRARY_DEFAULT_TIME: i64 = 456;
#[test] #[test]
fn shards_lines() { fn shards_lines() {
let lp = vec![ let lp = vec![
@ -1669,8 +1679,13 @@ mod tests {
.join("\n"); .join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries = let sharded_entries = lines_to_sharded_entries(
lines_to_sharded_entries(&lines, sharder(2).as_ref(), &partitioner(1)).unwrap(); &lines,
ARBITRARY_DEFAULT_TIME,
sharder(2).as_ref(),
&partitioner(1),
)
.unwrap();
assert_eq!(sharded_entries.len(), 2); assert_eq!(sharded_entries.len(), 2);
assert_eq!(sharded_entries[0].shard_id, Some(0)); assert_eq!(sharded_entries[0].shard_id, Some(0));
@ -1687,8 +1702,13 @@ mod tests {
.join("\n"); .join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries = let sharded_entries = lines_to_sharded_entries(
lines_to_sharded_entries(&lines, NO_SHARD_CONFIG, &partitioner(1)).unwrap(); &lines,
ARBITRARY_DEFAULT_TIME,
NO_SHARD_CONFIG,
&partitioner(1),
)
.unwrap();
assert_eq!(sharded_entries.len(), 1); assert_eq!(sharded_entries.len(), 1);
assert_eq!(sharded_entries[0].shard_id, None); assert_eq!(sharded_entries[0].shard_id, None);
@ -1704,8 +1724,13 @@ mod tests {
.join("\n"); .join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries = let sharded_entries = lines_to_sharded_entries(
lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(2)).unwrap(); &lines,
ARBITRARY_DEFAULT_TIME,
sharder(1).as_ref(),
&partitioner(2),
)
.unwrap();
let partition_writes = sharded_entries[0].entry.partition_writes().unwrap(); let partition_writes = sharded_entries[0].entry.partition_writes().unwrap();
assert_eq!(partition_writes.len(), 2); assert_eq!(partition_writes.len(), 2);
@ -1725,8 +1750,13 @@ mod tests {
.join("\n"); .join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries = let sharded_entries = lines_to_sharded_entries(
lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); &lines,
ARBITRARY_DEFAULT_TIME,
sharder(1).as_ref(),
&partitioner(1),
)
.unwrap();
let partition_writes = sharded_entries[0].entry.partition_writes().unwrap(); let partition_writes = sharded_entries[0].entry.partition_writes().unwrap();
let table_batches = partition_writes[0].table_batches(); let table_batches = partition_writes[0].table_batches();
@ -1742,8 +1772,13 @@ mod tests {
let lp = vec!["a,host=a val=23 983", "a,host=a,region=west val2=23.2 2343"].join("\n"); let lp = vec!["a,host=a val=23 983", "a,host=a,region=west val2=23.2 2343"].join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries = let sharded_entries = lines_to_sharded_entries(
lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); &lines,
ARBITRARY_DEFAULT_TIME,
sharder(1).as_ref(),
&partitioner(1),
)
.unwrap();
let partition_writes = sharded_entries[0].entry.partition_writes().unwrap(); let partition_writes = sharded_entries[0].entry.partition_writes().unwrap();
let table_batches = partition_writes[0].table_batches(); let table_batches = partition_writes[0].table_batches();
@ -1784,8 +1819,13 @@ mod tests {
.join("\n"); .join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries = let sharded_entries = lines_to_sharded_entries(
lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); &lines,
ARBITRARY_DEFAULT_TIME,
sharder(1).as_ref(),
&partitioner(1),
)
.unwrap();
let partition_writes = sharded_entries let partition_writes = sharded_entries
.first() .first()
@ -1855,8 +1895,13 @@ mod tests {
.join("\n"); .join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries = let sharded_entries = lines_to_sharded_entries(
lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); &lines,
ARBITRARY_DEFAULT_TIME,
sharder(1).as_ref(),
&partitioner(1),
)
.unwrap();
let partition_writes = sharded_entries let partition_writes = sharded_entries
.first() .first()
@ -1980,8 +2025,13 @@ mod tests {
let lp = vec!["a val=1i 1"].join("\n"); let lp = vec!["a val=1i 1"].join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries = let sharded_entries = lines_to_sharded_entries(
lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); &lines,
ARBITRARY_DEFAULT_TIME,
sharder(1).as_ref(),
&partitioner(1),
)
.unwrap();
let partition_writes = sharded_entries let partition_writes = sharded_entries
.first() .first()
.unwrap() .unwrap()
@ -2011,8 +2061,13 @@ mod tests {
.join("\n"); .join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries = let sharded_entries = lines_to_sharded_entries(
lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); &lines,
ARBITRARY_DEFAULT_TIME,
sharder(1).as_ref(),
&partitioner(1),
)
.unwrap();
let partition_writes = sharded_entries let partition_writes = sharded_entries
.first() .first()
.unwrap() .unwrap()
@ -2055,8 +2110,13 @@ mod tests {
.join("\n"); .join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries = let sharded_entries = lines_to_sharded_entries(
lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); &lines,
ARBITRARY_DEFAULT_TIME,
sharder(1).as_ref(),
&partitioner(1),
)
.unwrap();
let partition_writes = sharded_entries let partition_writes = sharded_entries
.first() .first()
.unwrap() .unwrap()
@ -2092,10 +2152,11 @@ mod tests {
let lp = vec!["a val=1i", "a val=2i 123"].join("\n"); let lp = vec!["a val=1i", "a val=2i 123"].join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let t = Utc::now().timestamp_nanos(); let default_time = Utc::now().timestamp_nanos();
let sharded_entries = let sharded_entries =
lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); lines_to_sharded_entries(&lines, default_time, sharder(1).as_ref(), &partitioner(1))
.unwrap();
let partition_writes = sharded_entries let partition_writes = sharded_entries
.first() .first()
@ -2110,17 +2171,74 @@ mod tests {
let col = columns.get(0).unwrap(); let col = columns.get(0).unwrap();
assert_eq!(col.name(), TIME_COLUMN_NAME); assert_eq!(col.name(), TIME_COLUMN_NAME);
let values = col.values().i64_values().unwrap(); let values = col.values().i64_values().unwrap();
assert!(values[0].unwrap() > t); assert_eq!(values[0].unwrap(), default_time);
assert_eq!(values[1], Some(123)); assert_eq!(values[1], Some(123));
} }
#[test]
fn missing_times_added_should_match_partition() {
use chrono::TimeZone;
let default_time = Utc
.datetime_from_str("2021-01-01 00:59:59 999999999", "%F %T %f")
.unwrap()
.timestamp_nanos();
// One point that has no timestamp
let lp = "a val=1i";
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
// Partition on the hour
let hour_partitioner = hour_partitioner();
// Extract the partition key the partitioned write was assigned
let sharded_entries =
lines_to_sharded_entries(&lines, default_time, sharder(1).as_ref(), &hour_partitioner)
.unwrap();
let partition_writes = sharded_entries
.first()
.unwrap()
.entry
.partition_writes()
.unwrap();
let partition_write = partition_writes.first().unwrap();
let assigned_partition_key = partition_write.key();
// Extract the timestamp the point was assigned
let table_batches = partition_write.table_batches();
let batch = table_batches.first().unwrap();
let columns = batch.columns();
let col = columns.get(0).unwrap();
assert_eq!(col.name(), TIME_COLUMN_NAME);
let values = col.values().i64_values().unwrap();
// Recreate the `ParsedLine` with the assigned timestamp to re-partition this point
let lp_with_assigned_timestamp = format!("{} {}", lp, values[0].unwrap());
let reparsed_lines: Vec<_> = parse_lines(&lp_with_assigned_timestamp)
.map(|l| l.unwrap())
.collect();
let point_key = hour_partitioner
.partition_key(
&reparsed_lines[0],
ARBITRARY_DEFAULT_TIME, // shouldn't be used since line now has timestamp
)
.unwrap();
// The point and the partitioned write it's in should have the same partition key
assert_eq!(point_key, assigned_partition_key);
}
#[test] #[test]
fn field_type_conflict() { fn field_type_conflict() {
let lp = vec!["a val=1i 1", "a val=2.1 123"].join("\n"); let lp = vec!["a val=1i 1", "a val=2.1 123"].join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries = let sharded_entries = lines_to_sharded_entries(
lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)); &lines,
ARBITRARY_DEFAULT_TIME,
sharder(1).as_ref(),
&partitioner(1),
);
assert!(sharded_entries.is_err()); assert!(sharded_entries.is_err());
} }
@ -2130,8 +2248,12 @@ mod tests {
let lp = vec!["a,host=a val=1i 1", "a host=\"b\" 123"].join("\n"); let lp = vec!["a,host=a val=1i 1", "a host=\"b\" 123"].join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries = let sharded_entries = lines_to_sharded_entries(
lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)); &lines,
ARBITRARY_DEFAULT_TIME,
sharder(1).as_ref(),
&partitioner(1),
);
assert!(sharded_entries.is_err()); assert!(sharded_entries.is_err());
} }
@ -2146,8 +2268,13 @@ mod tests {
.join("\n"); .join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries = let sharded_entries = lines_to_sharded_entries(
lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); &lines,
ARBITRARY_DEFAULT_TIME,
sharder(1).as_ref(),
&partitioner(1),
)
.unwrap();
let entry_bytes = sharded_entries.first().unwrap().entry.data(); let entry_bytes = sharded_entries.first().unwrap().entry.data();
let clock_value = ClockValue::try_from(23).unwrap(); let clock_value = ClockValue::try_from(23).unwrap();
@ -2231,8 +2358,13 @@ mod tests {
.join("\n"); .join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries = let sharded_entries = lines_to_sharded_entries(
lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); &lines,
ARBITRARY_DEFAULT_TIME,
sharder(1).as_ref(),
&partitioner(1),
)
.unwrap();
let entry_bytes = sharded_entries.first().unwrap().entry.data(); let entry_bytes = sharded_entries.first().unwrap().entry.data();
@ -2271,8 +2403,13 @@ mod tests {
.join("\n"); .join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries = let sharded_entries = lines_to_sharded_entries(
lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); &lines,
ARBITRARY_DEFAULT_TIME,
sharder(1).as_ref(),
&partitioner(1),
)
.unwrap();
let entry_bytes = sharded_entries.first().unwrap().entry.data(); let entry_bytes = sharded_entries.first().unwrap().entry.data();

View File

@ -570,7 +570,15 @@ impl<M: ConnectionManager> Server<M> {
/// of ShardedEntry which are then sent to other IOx servers based on /// of ShardedEntry which are then sent to other IOx servers based on
/// the ShardConfig or sent to the local database for buffering in the /// the ShardConfig or sent to the local database for buffering in the
/// WriteBuffer and/or the MutableBuffer if configured. /// WriteBuffer and/or the MutableBuffer if configured.
pub async fn write_lines(&self, db_name: &str, lines: &[ParsedLine<'_>]) -> Result<()> { ///
/// The provided `default_time` is nanoseconds since the epoch and will be assigned
/// to any lines that don't have a timestamp.
pub async fn write_lines(
&self,
db_name: &str,
lines: &[ParsedLine<'_>],
default_time: i64,
) -> Result<()> {
self.require_id()?; self.require_id()?;
let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?; let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?;
@ -589,7 +597,8 @@ impl<M: ConnectionManager> Server<M> {
let rules = db.rules.read(); let rules = db.rules.read();
let shard_config = &rules.shard_config; let shard_config = &rules.shard_config;
let sharded_entries = lines_to_sharded_entries(lines, shard_config.as_ref(), &*rules) let sharded_entries =
lines_to_sharded_entries(lines, default_time, shard_config.as_ref(), &*rules)
.context(LineConversion)?; .context(LineConversion)?;
let shards = shard_config let shards = shard_config
@ -1062,6 +1071,8 @@ mod tests {
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use tempfile::TempDir; use tempfile::TempDir;
const ARBITRARY_DEFAULT_TIME: i64 = 456;
// TODO: perhaps switch to a builder pattern. // TODO: perhaps switch to a builder pattern.
fn config_with_metric_registry_and_store( fn config_with_metric_registry_and_store(
object_store: ObjectStore, object_store: ObjectStore,
@ -1108,7 +1119,10 @@ mod tests {
assert!(matches!(resp, Error::IdNotSet)); assert!(matches!(resp, Error::IdNotSet));
let lines = parsed_lines("cpu foo=1 10"); let lines = parsed_lines("cpu foo=1 10");
let resp = server.write_lines("foo", &lines).await.unwrap_err(); let resp = server
.write_lines("foo", &lines, ARBITRARY_DEFAULT_TIME)
.await
.unwrap_err();
assert!(matches!(resp, Error::IdNotSet)); assert!(matches!(resp, Error::IdNotSet));
} }
@ -1317,7 +1331,10 @@ mod tests {
let line = "cpu bar=1 10"; let line = "cpu bar=1 10";
let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect();
server.write_lines("foo", &lines).await.unwrap(); server
.write_lines("foo", &lines, ARBITRARY_DEFAULT_TIME)
.await
.unwrap();
let db_name = DatabaseName::new("foo").unwrap(); let db_name = DatabaseName::new("foo").unwrap();
let db = server.db(&db_name).unwrap(); let db = server.db(&db_name).unwrap();
@ -1357,7 +1374,12 @@ mod tests {
let line = "cpu bar=1 10"; let line = "cpu bar=1 10";
let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect();
let sharded_entries = lines_to_sharded_entries(&lines, NO_SHARD_CONFIG, &*db.rules.read()) let sharded_entries = lines_to_sharded_entries(
&lines,
ARBITRARY_DEFAULT_TIME,
NO_SHARD_CONFIG,
&*db.rules.read(),
)
.expect("sharded entries"); .expect("sharded entries");
let entry = &sharded_entries[0].entry; let entry = &sharded_entries[0].entry;
@ -1457,14 +1479,20 @@ mod tests {
let line = "cpu bar=1 10"; let line = "cpu bar=1 10";
let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect();
let err = server.write_lines(&db_name, &lines).await.unwrap_err(); let err = server
.write_lines(&db_name, &lines, ARBITRARY_DEFAULT_TIME)
.await
.unwrap_err();
assert!( assert!(
matches!(err, Error::NoRemoteConfigured { node_group } if node_group == remote_ids) matches!(err, Error::NoRemoteConfigured { node_group } if node_group == remote_ids)
); );
// one remote is configured but it's down and we'll get connection error // one remote is configured but it's down and we'll get connection error
server.update_remote(bad_remote_id, BAD_REMOTE_ADDR.into()); server.update_remote(bad_remote_id, BAD_REMOTE_ADDR.into());
let err = server.write_lines(&db_name, &lines).await.unwrap_err(); let err = server
.write_lines(&db_name, &lines, ARBITRARY_DEFAULT_TIME)
.await
.unwrap_err();
assert!(matches!( assert!(matches!(
err, err,
Error::NoRemoteReachable { errors } if matches!( Error::NoRemoteReachable { errors } if matches!(
@ -1484,7 +1512,7 @@ mod tests {
// probability both the remotes will get hit. // probability both the remotes will get hit.
for _ in 0..100 { for _ in 0..100 {
server server
.write_lines(&db_name, &lines) .write_lines(&db_name, &lines, ARBITRARY_DEFAULT_TIME)
.await .await
.expect("cannot write lines"); .expect("cannot write lines");
} }
@ -1514,7 +1542,10 @@ mod tests {
let line = "cpu bar=1 10"; let line = "cpu bar=1 10";
let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect();
server.write_lines(&db_name, &lines).await.unwrap(); server
.write_lines(&db_name, &lines, ARBITRARY_DEFAULT_TIME)
.await
.unwrap();
// start the close (note this is not an async) // start the close (note this is not an async)
let partition_key = ""; let partition_key = "";
@ -1678,8 +1709,12 @@ mod tests {
// inserting first line does not trigger hard buffer limit // inserting first line does not trigger hard buffer limit
let line_1 = "cpu bar=1 10"; let line_1 = "cpu bar=1 10";
let lines_1: Vec<_> = parse_lines(line_1).map(|l| l.unwrap()).collect(); let lines_1: Vec<_> = parse_lines(line_1).map(|l| l.unwrap()).collect();
let sharded_entries_1 = let sharded_entries_1 = lines_to_sharded_entries(
lines_to_sharded_entries(&lines_1, NO_SHARD_CONFIG, &*db.rules.read()) &lines_1,
ARBITRARY_DEFAULT_TIME,
NO_SHARD_CONFIG,
&*db.rules.read(),
)
.expect("first sharded entries"); .expect("first sharded entries");
let entry_1 = &sharded_entries_1[0].entry; let entry_1 = &sharded_entries_1[0].entry;
@ -1691,8 +1726,12 @@ mod tests {
// inserting second line will // inserting second line will
let line_2 = "cpu bar=2 20"; let line_2 = "cpu bar=2 20";
let lines_2: Vec<_> = parse_lines(line_2).map(|l| l.unwrap()).collect(); let lines_2: Vec<_> = parse_lines(line_2).map(|l| l.unwrap()).collect();
let sharded_entries_2 = let sharded_entries_2 = lines_to_sharded_entries(
lines_to_sharded_entries(&lines_2, NO_SHARD_CONFIG, &*db.rules.read()) &lines_2,
ARBITRARY_DEFAULT_TIME,
NO_SHARD_CONFIG,
&*db.rules.read(),
)
.expect("second sharded entries"); .expect("second sharded entries");
let entry_2 = &sharded_entries_2[0].entry; let entry_2 = &sharded_entries_2[0].entry;
let res = server.write_entry("foo", entry_2.data().into()).await; let res = server.write_entry("foo", entry_2.data().into()).await;

View File

@ -23,6 +23,7 @@ use server::{ConnectionManager, Server as AppServer};
// External crates // External crates
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use chrono::Utc;
use futures::{self, StreamExt}; use futures::{self, StreamExt};
use http::header::{CONTENT_ENCODING, CONTENT_TYPE}; use http::header::{CONTENT_ENCODING, CONTENT_TYPE};
use hyper::{http::HeaderValue, Body, Method, Request, Response, StatusCode}; use hyper::{http::HeaderValue, Body, Method, Request, Response, StatusCode};
@ -480,6 +481,10 @@ where
let body = str::from_utf8(&body).context(ReadingBodyAsUtf8)?; let body = str::from_utf8(&body).context(ReadingBodyAsUtf8)?;
// The time, in nanoseconds since the epoch, to assign to any points that don't
// contain a timestamp
let default_time = Utc::now().timestamp_nanos();
let mut num_fields = 0; let mut num_fields = 0;
let mut num_lines = 0; let mut num_lines = 0;
@ -501,7 +506,10 @@ where
KeyValue::new("path", path), KeyValue::new("path", path),
]; ];
server.write_lines(&db_name, &lines).await.map_err(|e| { server
.write_lines(&db_name, &lines, default_time)
.await
.map_err(|e| {
let labels = &[ let labels = &[
metrics::KeyValue::new("status", "error"), metrics::KeyValue::new("status", "error"),
metrics::KeyValue::new("db_name", db_name.to_string()), metrics::KeyValue::new("db_name", db_name.to_string()),

View File

@ -1,5 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use chrono::Utc;
use generated_types::{google::FieldViolation, influxdata::iox::write::v1::*}; use generated_types::{google::FieldViolation, influxdata::iox::write::v1::*};
use influxdb_line_protocol::parse_lines; use influxdb_line_protocol::parse_lines;
use observability_deps::tracing::debug; use observability_deps::tracing::debug;
@ -25,6 +26,10 @@ where
) -> Result<tonic::Response<WriteResponse>, tonic::Status> { ) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
let request = request.into_inner(); let request = request.into_inner();
// The time, in nanoseconds since the epoch, to assign to any points that don't
// contain a timestamp
let default_time = Utc::now().timestamp_nanos();
let db_name = request.db_name; let db_name = request.db_name;
let lp_data = request.lp_data; let lp_data = request.lp_data;
let lp_chars = lp_data.len(); let lp_chars = lp_data.len();
@ -40,7 +45,7 @@ where
debug!(%db_name, %lp_chars, lp_line_count, "Writing lines into database"); debug!(%db_name, %lp_chars, lp_line_count, "Writing lines into database");
self.server self.server
.write_lines(&db_name, &lines) .write_lines(&db_name, &lines, default_time)
.await .await
.map_err(default_server_error_handler)?; .map_err(default_server_error_handler)?;

View File

@ -93,8 +93,10 @@ async fn test_write_entry() {
let lp_data = vec!["cpu bar=1 10", "cpu bar=2 20"].join("\n"); let lp_data = vec!["cpu bar=1 10", "cpu bar=2 20"].join("\n");
let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect(); let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect();
let default_time = 456;
let sharded_entries = let sharded_entries =
lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); lines_to_sharded_entries(&lines, default_time, sharder(1).as_ref(), &partitioner(1))
.unwrap();
let entry: Vec<u8> = sharded_entries.into_iter().next().unwrap().entry.into(); let entry: Vec<u8> = sharded_entries.into_iter().next().unwrap().entry.into();