refactor: Clean up timestamp handling logic and avoid a conversion (#4988)
* refactor: Clean up timestamp handling logic * fix: Remove unused timestamp function Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
1ef9c7ceda
commit
0c705fecf1
|
@ -82,14 +82,6 @@ impl Time {
|
||||||
Self(Utc.timestamp_millis(millis))
|
Self(Utc.timestamp_millis(millis))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Makes a new `DateTime` from the number of non-leap milliseconds
|
|
||||||
/// since January 1, 1970 0:00:00 UTC (aka "UNIX timestamp").
|
|
||||||
///
|
|
||||||
/// Returns None if out of range
|
|
||||||
pub fn from_timestamp_millis_opt(millis: i64) -> Option<Self> {
|
|
||||||
Some(Self(Utc.timestamp_millis_opt(millis).single()?))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Makes a new `Time` from the number of non-leap seconds
|
/// Makes a new `Time` from the number of non-leap seconds
|
||||||
/// since January 1, 1970 0:00:00 UTC (aka "UNIX timestamp")
|
/// since January 1, 1970 0:00:00 UTC (aka "UNIX timestamp")
|
||||||
/// and the number of nanoseconds since the last whole non-leap second.
|
/// and the number of nanoseconds since the last whole non-leap second.
|
||||||
|
@ -324,8 +316,6 @@ mod test {
|
||||||
assert!(chrono::Duration::from_std(duration).is_err());
|
assert!(chrono::Duration::from_std(duration).is_err());
|
||||||
assert!(time.checked_add(duration).is_none());
|
assert!(time.checked_add(duration).is_none());
|
||||||
assert!(time.checked_sub(duration).is_none());
|
assert!(time.checked_sub(duration).is_none());
|
||||||
|
|
||||||
assert!(Time::from_timestamp_millis_opt(i64::MAX).is_none())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
|
@ -191,18 +191,10 @@ impl WriteBufferStreamHandler for RSKafkaStreamHandler {
|
||||||
sequence_number: SequenceNumber::new(record.offset),
|
sequence_number: SequenceNumber::new(record.offset),
|
||||||
};
|
};
|
||||||
|
|
||||||
let timestamp_millis =
|
let timestamp_nanos = i64::try_from(record.record.timestamp.unix_timestamp_nanos())
|
||||||
i64::try_from(record.record.timestamp.unix_timestamp_nanos() / 1_000_000)
|
.map_err(WriteBufferError::invalid_data)?;
|
||||||
.map_err(WriteBufferError::invalid_data)?;
|
|
||||||
|
|
||||||
let timestamp = Time::from_timestamp_millis_opt(timestamp_millis)
|
let timestamp = Time::from_timestamp_nanos(timestamp_nanos);
|
||||||
.ok_or_else::<WriteBufferError, _>(|| {
|
|
||||||
format!(
|
|
||||||
"Cannot parse timestamp for milliseconds: {}",
|
|
||||||
timestamp_millis
|
|
||||||
)
|
|
||||||
.into()
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let value = record
|
let value = record
|
||||||
.record
|
.record
|
||||||
|
|
Loading…
Reference in New Issue