Merge branch 'main' into crepererum/ingest_wallclock

pull/24376/head
kodiakhq[bot] 2021-07-28 13:49:08 +00:00 committed by GitHub
commit 7b73190d79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 495 additions and 488 deletions

View File

@ -241,16 +241,19 @@ jobs:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
CARGO_INCREMENTAL: "0"
# We keep the debug symbols (Enabled in Cargo.toml as debug = true)
# workaround dynamic CPU detection bug in croaring
# https://github.com/influxdata/influxdb_iox/pull/2119
ROARING_ARCH: "haswell"
steps:
- checkout
- rust_components
- cache_restore
- run:
name: Print rustc target CPU options
command: cargo build --release --features="aws,gcp,azure" --bin print_cpu
command: cargo run --release --features="aws,gcp,azure" --bin print_cpu
- run:
name: Cargo release build with target arch set for CRoaring
command: ROARING_ARCH=haswell cargo build --release --features="aws,gcp,azure"
command: cargo build --release --features="aws,gcp,azure"
- run: |
echo sha256sum after build is
sha256sum target/release/influxdb_iox

View File

@ -1,9 +1,8 @@
//! Module contains a representation of chunk metadata
use std::sync::Arc;
use crate::partition_metadata::PartitionAddr;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
/// Address of the chunk within the catalog
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
@ -141,12 +140,12 @@ pub struct ChunkSummary {
/// The earliest time at which data contained within this chunk was written
/// into IOx. Note due to the compaction, etc... this may not be the chunk
/// that data was originally written into
pub time_of_first_write: Option<DateTime<Utc>>,
pub time_of_first_write: DateTime<Utc>,
/// The latest time at which data contained within this chunk was written
/// into IOx. Note due to the compaction, etc... this may not be the chunk
/// that data was originally written into
pub time_of_last_write: Option<DateTime<Utc>>,
pub time_of_last_write: DateTime<Utc>,
/// Time at which this chunk was marked as closed. Note this is
/// not the same as the timestamps on the data itself
@ -174,66 +173,14 @@ pub struct DetailedChunkSummary {
}
impl ChunkSummary {
/// Construct a ChunkSummary that has None for all timestamps
#[allow(clippy::too_many_arguments)]
pub fn new_without_timestamps(
partition_key: Arc<str>,
table_name: Arc<str>,
id: u32,
storage: ChunkStorage,
lifecycle_action: Option<ChunkLifecycleAction>,
memory_bytes: usize,
object_store_bytes: usize,
row_count: usize,
) -> Self {
Self {
partition_key,
table_name,
id,
storage,
lifecycle_action,
memory_bytes,
object_store_bytes,
row_count,
time_of_last_access: None,
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
}
}
/// Return a new ChunkSummary with None for all timestamps
pub fn normalize(self) -> Self {
let ChunkSummary {
partition_key,
table_name,
id,
storage,
lifecycle_action,
memory_bytes,
object_store_bytes,
row_count,
..
} = self;
Self::new_without_timestamps(
partition_key,
table_name,
id,
storage,
lifecycle_action,
memory_bytes,
object_store_bytes,
row_count,
)
}
/// Normalizes a set of ChunkSummaries for comparison by removing timestamps
pub fn normalize_summaries(summaries: Vec<Self>) -> Vec<Self> {
let mut summaries = summaries
.into_iter()
.map(|summary| summary.normalize())
.collect::<Vec<_>>();
summaries.sort_unstable();
summaries
pub fn equal_without_timestamps(&self, other: &Self) -> bool {
self.partition_key == other.partition_key
&& self.table_name == other.table_name
&& self.id == other.id
&& self.storage == other.storage
&& self.lifecycle_action == other.lifecycle_action
&& self.memory_bytes == other.memory_bytes
&& self.object_store_bytes == other.object_store_bytes
&& self.row_count == other.row_count
}
}

View File

@ -1,8 +1,12 @@
use crate::google::{FieldViolation, FromFieldOpt};
use crate::influxdata::iox::management::v1 as management;
use crate::{
google::{FieldViolation, FromFieldOpt},
influxdata::iox::management::v1 as management,
};
use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkStorage, ChunkSummary};
use std::convert::{TryFrom, TryInto};
use std::sync::Arc;
use std::{
convert::{TryFrom, TryInto},
sync::Arc,
};
/// Conversion code to management API chunk structure
impl From<ChunkSummary> for management::Chunk {
@ -32,8 +36,8 @@ impl From<ChunkSummary> for management::Chunk {
object_store_bytes: object_store_bytes as u64,
row_count: row_count as u64,
time_of_last_access: time_of_last_access.map(Into::into),
time_of_first_write: time_of_first_write.map(Into::into),
time_of_last_write: time_of_last_write.map(Into::into),
time_of_first_write: Some(time_of_first_write.into()),
time_of_last_write: Some(time_of_last_write.into()),
time_closed: time_closed.map(Into::into),
}
}
@ -79,6 +83,15 @@ impl TryFrom<management::Chunk> for ChunkSummary {
t.map(|t| convert_timestamp(t, field)).transpose()
};
let required_timestamp = |t: Option<google_types::protobuf::Timestamp>,
field: &'static str| {
t.ok_or_else(|| FieldViolation {
field: field.to_string(),
description: "Timestamp is required".to_string(),
})
.and_then(|t| convert_timestamp(t, field))
};
let management::Chunk {
partition_key,
table_name,
@ -105,8 +118,8 @@ impl TryFrom<management::Chunk> for ChunkSummary {
object_store_bytes: object_store_bytes as usize,
row_count: row_count as usize,
time_of_last_access: timestamp(time_of_last_access, "time_of_last_access")?,
time_of_first_write: timestamp(time_of_first_write, "time_of_first_write")?,
time_of_last_write: timestamp(time_of_last_write, "time_of_last_write")?,
time_of_first_write: required_timestamp(time_of_first_write, "time_of_first_write")?,
time_of_last_write: required_timestamp(time_of_last_write, "time_of_last_write")?,
time_closed: timestamp(time_closed, "time_closed")?,
})
}
@ -154,6 +167,7 @@ mod test {
#[test]
fn valid_proto_to_summary() {
let now = Utc::now();
let proto = management::Chunk {
partition_key: "foo".to_string(),
table_name: "bar".to_string(),
@ -164,8 +178,8 @@ mod test {
storage: management::ChunkStorage::ObjectStoreOnly.into(),
lifecycle_action: management::ChunkLifecycleAction::Moving.into(),
time_of_first_write: None,
time_of_last_write: None,
time_of_first_write: Some(now.into()),
time_of_last_write: Some(now.into()),
time_closed: None,
time_of_last_access: Some(google_types::protobuf::Timestamp {
seconds: 50,
@ -183,8 +197,8 @@ mod test {
row_count: 321,
storage: ChunkStorage::ObjectStoreOnly,
lifecycle_action: Some(ChunkLifecycleAction::Moving),
time_of_first_write: None,
time_of_last_write: None,
time_of_first_write: now,
time_of_last_write: now,
time_closed: None,
time_of_last_access: Some(Utc.timestamp_nanos(50_000_000_007)),
};
@ -198,6 +212,7 @@ mod test {
#[test]
fn valid_summary_to_proto() {
let now = Utc::now();
let summary = ChunkSummary {
partition_key: Arc::from("foo"),
table_name: Arc::from("bar"),
@ -207,8 +222,8 @@ mod test {
row_count: 321,
storage: ChunkStorage::ObjectStoreOnly,
lifecycle_action: Some(ChunkLifecycleAction::Persisting),
time_of_first_write: None,
time_of_last_write: None,
time_of_first_write: now,
time_of_last_write: now,
time_closed: None,
time_of_last_access: Some(Utc.timestamp_nanos(12_000_100_007)),
};
@ -224,8 +239,8 @@ mod test {
row_count: 321,
storage: management::ChunkStorage::ObjectStoreOnly.into(),
lifecycle_action: management::ChunkLifecycleAction::Persisting.into(),
time_of_first_write: None,
time_of_last_write: None,
time_of_first_write: Some(now.into()),
time_of_last_write: Some(now.into()),
time_closed: None,
time_of_last_access: Some(google_types::protobuf::Timestamp {
seconds: 12,

View File

@ -9,18 +9,19 @@
)]
use chrono::{DateTime, Utc};
use data_types::chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkStorage};
use data_types::database_rules::LifecycleRules;
use data_types::DatabaseName;
pub use guard::*;
use data_types::{
chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkStorage},
database_rules::LifecycleRules,
DatabaseName,
};
use internal_types::access::AccessMetrics;
pub use policy::*;
use std::time::Instant;
use tracker::TaskTracker;
mod guard;
pub use guard::*;
mod policy;
pub use policy::*;
/// A trait that encapsulates the database logic that is automated by `LifecyclePolicy`
pub trait LifecycleDb {
@ -177,7 +178,7 @@ pub trait LifecycleChunk {
/// Returns the access metrics for this chunk
fn access_metrics(&self) -> AccessMetrics;
fn time_of_last_write(&self) -> Option<DateTime<Utc>>;
fn time_of_last_write(&self) -> DateTime<Utc>;
fn addr(&self) -> &ChunkAddr;

View File

@ -1,21 +1,22 @@
use std::convert::TryInto;
use std::fmt::Debug;
use std::time::{Duration, Instant};
use chrono::{DateTime, Utc};
use data_types::DatabaseName;
use futures::future::BoxFuture;
use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkStorage};
use data_types::database_rules::LifecycleRules;
use observability_deps::tracing::{debug, info, trace, warn};
use tracker::TaskTracker;
use crate::{
LifecycleChunk, LifecycleDb, LifecyclePartition, LockableChunk, LockablePartition,
PersistHandle,
};
use chrono::{DateTime, Utc};
use data_types::{
chunk_metadata::{ChunkLifecycleAction, ChunkStorage},
database_rules::LifecycleRules,
DatabaseName,
};
use futures::future::BoxFuture;
use internal_types::access::AccessMetrics;
use observability_deps::tracing::{debug, info, trace, warn};
use std::{
convert::TryInto,
fmt::Debug,
time::{Duration, Instant},
};
use tracker::TaskTracker;
/// Number of seconds to wait before retying a failed lifecycle action
pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10);
@ -613,16 +614,7 @@ fn can_move<C: LifecycleChunk>(rules: &LifecycleRules, chunk: &C, now: DateTime<
return true;
}
match chunk.time_of_last_write() {
Some(last_write)
if elapsed_seconds(now, last_write) >= rules.late_arrive_window_seconds.get() =>
{
true
}
// Disable movement the chunk is empty, or the linger hasn't expired
_ => false,
}
elapsed_seconds(now, chunk.time_of_last_write()) >= rules.late_arrive_window_seconds.get()
}
/// An action to free up memory
@ -654,21 +646,20 @@ fn sort_free_candidates<P>(candidates: &mut Vec<FreeCandidate<'_, P>>) {
#[cfg(test)]
mod tests {
use std::cmp::max;
use std::collections::BTreeMap;
use std::convert::Infallible;
use std::num::{NonZeroU32, NonZeroUsize};
use std::sync::Arc;
use data_types::chunk_metadata::{ChunkAddr, ChunkStorage};
use tracker::{RwLock, TaskId, TaskRegistration, TaskRegistry};
use super::*;
use crate::{
ChunkLifecycleAction, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk,
LockablePartition, PersistHandle,
};
use super::*;
use data_types::chunk_metadata::{ChunkAddr, ChunkStorage};
use std::{
cmp::max,
collections::BTreeMap,
convert::Infallible,
num::{NonZeroU32, NonZeroUsize},
sync::Arc,
};
use tracker::{RwLock, TaskId, TaskRegistration, TaskRegistry};
#[derive(Debug, Eq, PartialEq)]
enum MoverEvents {
@ -710,13 +701,13 @@ mod tests {
row_count: usize,
min_timestamp: Option<DateTime<Utc>>,
access_metrics: AccessMetrics,
time_of_last_write: Option<DateTime<Utc>>,
time_of_last_write: DateTime<Utc>,
lifecycle_action: Option<TaskTracker<ChunkLifecycleAction>>,
storage: ChunkStorage,
}
impl TestChunk {
fn new(id: u32, time_of_last_write: Option<i64>, storage: ChunkStorage) -> Self {
fn new(id: u32, time_of_last_write: i64, storage: ChunkStorage) -> Self {
let addr = ChunkAddr {
db_name: Arc::from(""),
table_name: Arc::from(""),
@ -732,7 +723,7 @@ mod tests {
count: 0,
last_instant: Instant::now(),
},
time_of_last_write: time_of_last_write.map(from_secs),
time_of_last_write: from_secs(time_of_last_write),
lifecycle_action: None,
storage,
}
@ -836,7 +827,7 @@ mod tests {
let id = partition.next_id;
partition.next_id += 1;
let mut new_chunk = TestChunk::new(id, None, ChunkStorage::ReadBuffer);
let mut new_chunk = TestChunk::new(id, 0, ChunkStorage::ReadBuffer);
new_chunk.row_count = 0;
for chunk in &chunks {
@ -876,7 +867,7 @@ mod tests {
partition.next_id += 1;
// The remainder left behind after the split
let new_chunk = TestChunk::new(id, None, ChunkStorage::ReadBuffer)
let new_chunk = TestChunk::new(id, 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(handle.timestamp + chrono::Duration::nanoseconds(1));
partition
@ -973,7 +964,7 @@ mod tests {
self.access_metrics.clone()
}
fn time_of_last_write(&self) -> Option<DateTime<Utc>> {
fn time_of_last_write(&self) -> DateTime<Utc> {
self.time_of_last_write
}
@ -1094,20 +1085,20 @@ mod tests {
mub_row_threshold: NonZeroUsize::new(74).unwrap(),
..Default::default()
};
let chunk = TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer);
let chunk = TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer);
assert!(!can_move(&rules, &chunk, from_secs(9)));
assert!(can_move(&rules, &chunk, from_secs(11)));
// can move even if the chunk is small
let chunk = TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer).with_row_count(73);
let chunk = TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer).with_row_count(73);
assert!(can_move(&rules, &chunk, from_secs(11)));
// If over the row count threshold, we should be able to move
let chunk = TestChunk::new(0, None, ChunkStorage::OpenMutableBuffer).with_row_count(74);
let chunk = TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer).with_row_count(74);
assert!(can_move(&rules, &chunk, from_secs(0)));
// If below the default row count threshold, it shouldn't move
let chunk = TestChunk::new(0, None, ChunkStorage::OpenMutableBuffer).with_row_count(73);
let chunk = TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer).with_row_count(73);
assert!(!can_move(&rules, &chunk, from_secs(0)));
}
@ -1167,9 +1158,9 @@ mod tests {
// The default rules shouldn't do anything
let rules = LifecycleRules::default();
let chunks = vec![
TestChunk::new(0, Some(1), ChunkStorage::OpenMutableBuffer),
TestChunk::new(1, Some(1), ChunkStorage::OpenMutableBuffer),
TestChunk::new(2, Some(1), ChunkStorage::OpenMutableBuffer),
TestChunk::new(0, 1, ChunkStorage::OpenMutableBuffer),
TestChunk::new(1, 1, ChunkStorage::OpenMutableBuffer),
TestChunk::new(2, 1, ChunkStorage::OpenMutableBuffer),
];
let db = TestDb::new(rules, chunks);
@ -1185,9 +1176,9 @@ mod tests {
..Default::default()
};
let chunks = vec![
TestChunk::new(0, Some(8), ChunkStorage::OpenMutableBuffer),
TestChunk::new(1, Some(5), ChunkStorage::OpenMutableBuffer),
TestChunk::new(2, Some(0), ChunkStorage::OpenMutableBuffer),
TestChunk::new(0, 8, ChunkStorage::OpenMutableBuffer),
TestChunk::new(1, 5, ChunkStorage::OpenMutableBuffer),
TestChunk::new(2, 0, ChunkStorage::OpenMutableBuffer),
];
let db = TestDb::new(rules, chunks);
@ -1262,7 +1253,7 @@ mod tests {
..Default::default()
};
let chunks = vec![TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer)];
let chunks = vec![TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer)];
let db = TestDb::new(rules.clone(), chunks);
let mut lifecycle = LifecyclePolicy::new(&db);
@ -1272,29 +1263,30 @@ mod tests {
let instant = Instant::now();
let chunks =
vec![
// two "open" chunks => they must not be dropped (yet)
TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer),
TestChunk::new(1, Some(0), ChunkStorage::OpenMutableBuffer),
// "moved" chunk => can be dropped because `drop_non_persistent=true`
TestChunk::new(2, Some(0), ChunkStorage::ReadBuffer),
// "writing" chunk => cannot be unloaded while write is in-progress
TestChunk::new(3, Some(0), ChunkStorage::ReadBuffer)
.with_action(ChunkLifecycleAction::Persisting),
// "written" chunk => can be unloaded
TestChunk::new(4, Some(0), ChunkStorage::ReadBufferAndObjectStore)
.with_access_metrics(AccessMetrics {
count: 1,
last_instant: instant,
}),
// "written" chunk => can be unloaded
TestChunk::new(5, Some(0), ChunkStorage::ReadBufferAndObjectStore)
.with_access_metrics(AccessMetrics {
count: 12,
last_instant: instant - Duration::from_secs(1),
}),
];
let chunks = vec![
// two "open" chunks => they must not be dropped (yet)
TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer),
TestChunk::new(1, 0, ChunkStorage::OpenMutableBuffer),
// "moved" chunk => can be dropped because `drop_non_persistent=true`
TestChunk::new(2, 0, ChunkStorage::ReadBuffer),
// "writing" chunk => cannot be unloaded while write is in-progress
TestChunk::new(3, 0, ChunkStorage::ReadBuffer)
.with_action(ChunkLifecycleAction::Persisting),
// "written" chunk => can be unloaded
TestChunk::new(4, 0, ChunkStorage::ReadBufferAndObjectStore).with_access_metrics(
AccessMetrics {
count: 1,
last_instant: instant,
},
),
// "written" chunk => can be unloaded
TestChunk::new(5, 0, ChunkStorage::ReadBufferAndObjectStore).with_access_metrics(
AccessMetrics {
count: 12,
last_instant: instant - Duration::from_secs(1),
},
),
];
let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db);
@ -1324,7 +1316,7 @@ mod tests {
};
assert!(!rules.drop_non_persisted);
let chunks = vec![TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer)];
let chunks = vec![TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer)];
let db = TestDb::new(rules.clone(), chunks);
let mut lifecycle = LifecyclePolicy::new(&db);
@ -1334,15 +1326,15 @@ mod tests {
let chunks = vec![
// two "open" chunks => they must not be dropped (yet)
TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer),
TestChunk::new(1, Some(0), ChunkStorage::OpenMutableBuffer),
TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer),
TestChunk::new(1, 0, ChunkStorage::OpenMutableBuffer),
// "moved" chunk => cannot be dropped because `drop_non_persistent=false`
TestChunk::new(2, Some(0), ChunkStorage::ReadBuffer),
TestChunk::new(2, 0, ChunkStorage::ReadBuffer),
// "writing" chunk => cannot be drop while write is in-progess
TestChunk::new(3, Some(0), ChunkStorage::ReadBuffer)
TestChunk::new(3, 0, ChunkStorage::ReadBuffer)
.with_action(ChunkLifecycleAction::Persisting),
// "written" chunk => can be unloaded
TestChunk::new(4, Some(0), ChunkStorage::ReadBufferAndObjectStore),
TestChunk::new(4, 0, ChunkStorage::ReadBufferAndObjectStore),
];
let db = TestDb::new(rules, chunks);
@ -1360,7 +1352,7 @@ mod tests {
..Default::default()
};
let chunks = vec![TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer)];
let chunks = vec![TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer)];
let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db);
@ -1383,55 +1375,55 @@ mod tests {
let partitions = vec![
TestPartition::new(vec![
// still receiving writes => cannot compact
TestChunk::new(0, Some(20), ChunkStorage::OpenMutableBuffer),
TestChunk::new(0, 20, ChunkStorage::OpenMutableBuffer),
]),
TestPartition::new(vec![
// still receiving writes => cannot compact
TestChunk::new(1, Some(20), ChunkStorage::OpenMutableBuffer),
TestChunk::new(1, 20, ChunkStorage::OpenMutableBuffer),
// closed => can compact
TestChunk::new(2, Some(20), ChunkStorage::ClosedMutableBuffer),
TestChunk::new(2, 20, ChunkStorage::ClosedMutableBuffer),
]),
TestPartition::new(vec![
// open but cold => can compact
TestChunk::new(3, Some(5), ChunkStorage::OpenMutableBuffer),
TestChunk::new(3, 5, ChunkStorage::OpenMutableBuffer),
// closed => can compact
TestChunk::new(4, Some(20), ChunkStorage::ClosedMutableBuffer),
TestChunk::new(4, 20, ChunkStorage::ClosedMutableBuffer),
// closed => can compact
TestChunk::new(5, Some(20), ChunkStorage::ReadBuffer),
TestChunk::new(5, 20, ChunkStorage::ReadBuffer),
// persisted => cannot compact
TestChunk::new(6, Some(20), ChunkStorage::ReadBufferAndObjectStore),
TestChunk::new(6, 20, ChunkStorage::ReadBufferAndObjectStore),
// persisted => cannot compact
TestChunk::new(7, Some(20), ChunkStorage::ObjectStoreOnly),
TestChunk::new(7, 20, ChunkStorage::ObjectStoreOnly),
]),
TestPartition::new(vec![
// closed => can compact
TestChunk::new(8, Some(20), ChunkStorage::ReadBuffer),
TestChunk::new(8, 20, ChunkStorage::ReadBuffer),
// closed => can compact
TestChunk::new(9, Some(20), ChunkStorage::ReadBuffer),
TestChunk::new(9, 20, ChunkStorage::ReadBuffer),
// persisted => cannot compact
TestChunk::new(10, Some(20), ChunkStorage::ReadBufferAndObjectStore),
TestChunk::new(10, 20, ChunkStorage::ReadBufferAndObjectStore),
// persisted => cannot compact
TestChunk::new(11, Some(20), ChunkStorage::ObjectStoreOnly),
TestChunk::new(11, 20, ChunkStorage::ObjectStoreOnly),
]),
TestPartition::new(vec![
// open but cold => can compact
TestChunk::new(12, Some(5), ChunkStorage::OpenMutableBuffer),
TestChunk::new(12, 5, ChunkStorage::OpenMutableBuffer),
]),
TestPartition::new(vec![
// already compacted => should not compact
TestChunk::new(13, Some(5), ChunkStorage::ReadBuffer),
TestChunk::new(13, 5, ChunkStorage::ReadBuffer),
]),
TestPartition::new(vec![
// closed => can compact
TestChunk::new(14, Some(20), ChunkStorage::ReadBuffer).with_row_count(400),
TestChunk::new(14, 20, ChunkStorage::ReadBuffer).with_row_count(400),
// too many individual rows => ignore
TestChunk::new(15, Some(20), ChunkStorage::ReadBuffer).with_row_count(1_000),
TestChunk::new(15, 20, ChunkStorage::ReadBuffer).with_row_count(1_000),
// closed => can compact
TestChunk::new(16, Some(20), ChunkStorage::ReadBuffer).with_row_count(400),
TestChunk::new(16, 20, ChunkStorage::ReadBuffer).with_row_count(400),
// too many total rows => next compaction job
TestChunk::new(17, Some(20), ChunkStorage::ReadBuffer).with_row_count(400),
TestChunk::new(17, 20, ChunkStorage::ReadBuffer).with_row_count(400),
// too many total rows => next compaction job
TestChunk::new(18, Some(20), ChunkStorage::ReadBuffer).with_row_count(400),
TestChunk::new(18, 20, ChunkStorage::ReadBuffer).with_row_count(400),
]),
];
@ -1466,19 +1458,19 @@ mod tests {
let partitions = vec![
TestPartition::new(vec![
// closed => can compact
TestChunk::new(0, Some(20), ChunkStorage::ClosedMutableBuffer),
TestChunk::new(0, 20, ChunkStorage::ClosedMutableBuffer),
// closed => can compact
TestChunk::new(10, Some(30), ChunkStorage::ClosedMutableBuffer),
TestChunk::new(10, 30, ChunkStorage::ClosedMutableBuffer),
// closed => can compact
TestChunk::new(12, Some(40), ChunkStorage::ClosedMutableBuffer),
TestChunk::new(12, 40, ChunkStorage::ClosedMutableBuffer),
]),
TestPartition::new(vec![
// closed => can compact
TestChunk::new(1, Some(20), ChunkStorage::ClosedMutableBuffer),
TestChunk::new(1, 20, ChunkStorage::ClosedMutableBuffer),
]),
TestPartition::new(vec![
// closed => can compact
TestChunk::new(200, Some(10), ChunkStorage::ClosedMutableBuffer),
TestChunk::new(200, 10, ChunkStorage::ClosedMutableBuffer),
]),
];
@ -1516,65 +1508,59 @@ mod tests {
let partitions = vec![
// Insufficient rows and not old enough => don't persist but can compact
TestPartition::new(vec![
TestChunk::new(0, Some(0), ChunkStorage::ClosedMutableBuffer)
TestChunk::new(0, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)),
TestChunk::new(1, Some(0), ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)),
TestChunk::new(1, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)),
])
.with_persistence(10, now, from_secs(20)),
// Sufficient rows => persist
TestPartition::new(vec![
TestChunk::new(2, Some(0), ChunkStorage::ClosedMutableBuffer)
TestChunk::new(2, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)),
TestChunk::new(3, Some(0), ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)),
TestChunk::new(3, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)),
])
.with_persistence(1_000, now, from_secs(20)),
// Writes too old => persist
TestPartition::new(vec![
// Should split open chunks
TestChunk::new(4, Some(20), ChunkStorage::OpenMutableBuffer)
TestChunk::new(4, 20, ChunkStorage::OpenMutableBuffer)
.with_min_timestamp(from_secs(10)),
TestChunk::new(5, Some(0), ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)),
TestChunk::new(6, Some(0), ChunkStorage::ObjectStoreOnly)
TestChunk::new(5, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)),
TestChunk::new(6, 0, ChunkStorage::ObjectStoreOnly)
.with_min_timestamp(from_secs(5)),
])
.with_persistence(10, now - Duration::from_secs(10), from_secs(20)),
// Sufficient rows but conflicting compaction => prevent compaction
TestPartition::new(vec![
TestChunk::new(7, Some(0), ChunkStorage::ClosedMutableBuffer)
TestChunk::new(7, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10))
.with_action(ChunkLifecycleAction::Compacting),
// This chunk would be a compaction candidate, but we want to persist it
TestChunk::new(8, Some(0), ChunkStorage::ClosedMutableBuffer)
TestChunk::new(8, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)),
TestChunk::new(9, Some(0), ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)),
TestChunk::new(9, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)),
])
.with_persistence(1_000, now, from_secs(20)),
// Sufficient rows and non-conflicting compaction => persist
TestPartition::new(vec![
TestChunk::new(10, Some(0), ChunkStorage::ClosedMutableBuffer)
TestChunk::new(10, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(21))
.with_action(ChunkLifecycleAction::Compacting),
TestChunk::new(11, Some(0), ChunkStorage::ClosedMutableBuffer)
TestChunk::new(11, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)),
TestChunk::new(12, Some(0), ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)),
TestChunk::new(12, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)),
])
.with_persistence(1_000, now, from_secs(20)),
// Sufficient rows, non-conflicting compaction and compact-able chunk => persist + compact
TestPartition::new(vec![
TestChunk::new(13, Some(0), ChunkStorage::ClosedMutableBuffer)
TestChunk::new(13, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(21))
.with_action(ChunkLifecycleAction::Compacting),
TestChunk::new(14, Some(0), ChunkStorage::ClosedMutableBuffer)
TestChunk::new(14, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(21)),
TestChunk::new(15, Some(0), ChunkStorage::ClosedMutableBuffer)
TestChunk::new(15, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)),
TestChunk::new(16, Some(0), ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)),
TestChunk::new(16, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)),
])
.with_persistence(1_000, now, from_secs(20)),
];
@ -1604,7 +1590,7 @@ mod tests {
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
..Default::default()
};
let chunks = vec![TestChunk::new(0, Some(40), ChunkStorage::OpenMutableBuffer)];
let chunks = vec![TestChunk::new(0, 40, ChunkStorage::OpenMutableBuffer)];
let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db);
@ -1619,11 +1605,7 @@ mod tests {
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
..Default::default()
};
let chunks = vec![TestChunk::new(
0,
Some(40),
ChunkStorage::ClosedMutableBuffer,
)];
let chunks = vec![TestChunk::new(0, 40, ChunkStorage::ClosedMutableBuffer)];
let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db);
@ -1635,7 +1617,7 @@ mod tests {
#[test]
fn test_recovers_lifecycle_action() {
let rules = LifecycleRules::default();
let chunks = vec![TestChunk::new(0, None, ChunkStorage::ClosedMutableBuffer)];
let chunks = vec![TestChunk::new(0, 0, ChunkStorage::ClosedMutableBuffer)];
let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db);

View File

@ -1,15 +1,17 @@
//! This module contains the main IOx Database object which has the
//! instances of the mutable buffer, read buffer, and object store
use crate::db::catalog::chunk::ChunkStage;
use crate::db::catalog::table::TableSchemaUpsertHandle;
pub(crate) use crate::db::chunk::DbChunk;
use crate::db::lifecycle::ArcDb;
use crate::{
db::{
access::QueryCatalogAccess,
catalog::{chunk::CatalogChunk, partition::Partition, Catalog, TableNameFilter},
lifecycle::{LockableCatalogChunk, LockableCatalogPartition},
catalog::{
chunk::{CatalogChunk, ChunkStage},
partition::Partition,
table::TableSchemaUpsertHandle,
Catalog, TableNameFilter,
},
lifecycle::{ArcDb, LockableCatalogChunk, LockableCatalogPartition},
},
JobRegistry,
};
@ -31,13 +33,11 @@ use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
use object_store::{path::parsed::DirsAndFileName, ObjectStore};
use observability_deps::tracing::{debug, error, info};
use parking_lot::RwLock;
use parquet_file::catalog::CatalogParquetInfo;
use parquet_file::{
catalog::{CheckpointData, PreservedCatalog},
catalog::{CatalogParquetInfo, CheckpointData, PreservedCatalog},
cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files},
};
use persistence_windows::checkpoint::ReplayPlan;
use persistence_windows::persistence_windows::PersistenceWindows;
use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::PersistenceWindows};
use query::{exec::Executor, predicate::Predicate, QueryDatabase};
use rand_distr::{Distribution, Poisson};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
@ -50,8 +50,10 @@ use std::{
},
time::{Duration, Instant},
};
use write_buffer::config::WriteBufferConfig;
use write_buffer::core::{FetchHighWatermark, WriteBufferError};
use write_buffer::{
config::WriteBufferConfig,
core::{FetchHighWatermark, WriteBufferError},
};
pub mod access;
pub mod catalog;
@ -1345,12 +1347,12 @@ mod tests {
use arrow::record_batch::RecordBatch;
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use bytes::Bytes;
use chrono::DateTime;
use data_types::write_summary::TimestampSummary;
use chrono::{DateTime, TimeZone};
use data_types::{
chunk_metadata::ChunkStorage,
database_rules::{LifecycleRules, PartitionTemplate, TemplatePart},
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
write_summary::TimestampSummary,
};
use entry::{test_helpers::lp_to_entry, Sequence};
use futures::{stream, StreamExt, TryStreamExt};
@ -1991,13 +1993,14 @@ mod tests {
#[tokio::test]
async fn write_metrics() {
std::env::set_var("INFLUXDB_IOX_ROW_TIMESTAMP_METRICS", "write_metrics_test");
let test_db = make_db().await;
let db = Arc::clone(&test_db.db);
write_lp(db.as_ref(), "cpu foo=1 100000000000").await;
write_lp(db.as_ref(), "cpu foo=2 180000000000").await;
write_lp(db.as_ref(), "cpu foo=3 650000000000").await;
write_lp(db.as_ref(), "cpu foo=3 650000000010").await;
write_lp(db.as_ref(), "write_metrics_test foo=1 100000000000").await;
write_lp(db.as_ref(), "write_metrics_test foo=2 180000000000").await;
write_lp(db.as_ref(), "write_metrics_test foo=3 650000000000").await;
write_lp(db.as_ref(), "write_metrics_test foo=3 650000000010").await;
let mut summary = TimestampSummary::default();
summary.record(Utc.timestamp_nanos(100000000000));
@ -2013,7 +2016,7 @@ mod tests {
.with_labels(&[
("svr_id", "1"),
("db_name", "placeholder"),
("table", "cpu"),
("table", "write_metrics_test"),
("le", minute.as_str()),
])
.counter()
@ -2653,10 +2656,7 @@ mod tests {
let chunk = partition.open_chunk().unwrap();
let chunk = chunk.read();
(
partition.last_write_at(),
chunk.time_of_last_write().unwrap(),
)
(partition.last_write_at(), chunk.time_of_last_write())
};
let entry = lp_to_entry("cpu bar=true 10");
@ -2668,7 +2668,7 @@ mod tests {
assert_eq!(last_write_prev, partition.last_write_at());
let chunk = partition.open_chunk().unwrap();
let chunk = chunk.read();
assert_eq!(chunk_last_write_prev, chunk.time_of_last_write().unwrap());
assert_eq!(chunk_last_write_prev, chunk.time_of_last_write());
}
}
@ -2821,9 +2821,9 @@ mod tests {
println!("Chunk: {:#?}", chunk);
// then the chunk creation and rollover times are as expected
assert!(start < chunk.time_of_first_write().unwrap());
assert!(chunk.time_of_first_write().unwrap() < after_data_load);
assert!(chunk.time_of_first_write().unwrap() == chunk.time_of_last_write().unwrap());
assert!(start < chunk.time_of_first_write());
assert!(chunk.time_of_first_write() < after_data_load);
assert!(chunk.time_of_first_write() == chunk.time_of_last_write());
assert!(after_data_load < chunk.time_closed().unwrap());
assert!(chunk.time_closed().unwrap() < after_rollover);
}
@ -2883,18 +2883,21 @@ mod tests {
print!("Partitions: {:?}", db.partition_keys().unwrap());
let chunk_summaries = db.partition_chunk_summaries("1970-01-05T15");
let chunk_summaries = ChunkSummary::normalize_summaries(chunk_summaries);
let expected = vec![ChunkSummary::new_without_timestamps(
Arc::from("1970-01-05T15"),
Arc::from("cpu"),
0,
ChunkStorage::OpenMutableBuffer,
None,
70, // memory_size
0, // os_size
1,
)];
let expected = vec![ChunkSummary {
partition_key: Arc::from("1970-01-05T15"),
table_name: Arc::from("cpu"),
id: 0,
storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action: None,
memory_bytes: 70, // memory_size
object_store_bytes: 0, // os_size
row_count: 1,
time_of_last_access: None,
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(1),
time_closed: None,
}];
let size: usize = db
.chunk_summaries()
@ -2905,11 +2908,14 @@ mod tests {
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), size);
assert_eq!(
expected, chunk_summaries,
"expected:\n{:#?}\n\nactual:{:#?}\n\n",
expected, chunk_summaries
);
for (expected_summary, actual_summary) in expected.iter().zip(chunk_summaries.iter()) {
assert!(
expected_summary.equal_without_timestamps(&actual_summary),
"expected:\n{:#?}\n\nactual:{:#?}\n\n",
expected_summary,
actual_summary
);
}
}
#[tokio::test]
@ -2929,23 +2935,23 @@ mod tests {
let summary = &chunk_summaries[0];
assert_eq!(summary.id, 0, "summary; {:#?}", summary);
assert!(
summary.time_of_first_write.unwrap() > start,
summary.time_of_first_write > start,
"summary; {:#?}",
summary
);
assert!(
summary.time_of_first_write.unwrap() < after_close,
summary.time_of_first_write < after_close,
"summary; {:#?}",
summary
);
assert!(
summary.time_of_last_write.unwrap() > after_first_write,
summary.time_of_last_write > after_first_write,
"summary; {:#?}",
summary
);
assert!(
summary.time_of_last_write.unwrap() < after_close,
summary.time_of_last_write < after_close,
"summary; {:#?}",
summary
);
@ -2963,8 +2969,8 @@ mod tests {
}
fn assert_first_last_times_eq(chunk_summary: &ChunkSummary) {
let first_write = chunk_summary.time_of_first_write.unwrap();
let last_write = chunk_summary.time_of_last_write.unwrap();
let first_write = chunk_summary.time_of_first_write;
let last_write = chunk_summary.time_of_last_write;
assert_eq!(first_write, last_write);
}
@ -2974,8 +2980,8 @@ mod tests {
before: DateTime<Utc>,
after: DateTime<Utc>,
) {
let first_write = chunk_summary.time_of_first_write.unwrap();
let last_write = chunk_summary.time_of_last_write.unwrap();
let first_write = chunk_summary.time_of_first_write;
let last_write = chunk_summary.time_of_last_write;
assert!(before < first_write);
assert!(before < last_write);
@ -2984,8 +2990,8 @@ mod tests {
}
fn assert_chunks_times_ordered(before: &ChunkSummary, after: &ChunkSummary) {
let before_last_write = before.time_of_last_write.unwrap();
let after_first_write = after.time_of_first_write.unwrap();
let before_last_write = before.time_of_last_write;
let after_first_write = after.time_of_first_write;
assert!(before_last_write < after_first_write);
}
@ -2996,14 +3002,14 @@ mod tests {
}
fn assert_chunks_first_times_eq(a: &ChunkSummary, b: &ChunkSummary) {
let a_first_write = a.time_of_first_write.unwrap();
let b_first_write = b.time_of_first_write.unwrap();
let a_first_write = a.time_of_first_write;
let b_first_write = b.time_of_first_write;
assert_eq!(a_first_write, b_first_write);
}
fn assert_chunks_last_times_eq(a: &ChunkSummary, b: &ChunkSummary) {
let a_last_write = a.time_of_last_write.unwrap();
let b_last_write = b.time_of_last_write.unwrap();
let a_last_write = a.time_of_last_write;
let b_last_write = b.time_of_last_write;
assert_eq!(a_last_write, b_last_write);
}
@ -3165,49 +3171,62 @@ mod tests {
assert_first_last_times_eq(&open_mb_t8);
assert_first_last_times_between(&open_mb_t8, time7, time8);
let chunk_summaries = ChunkSummary::normalize_summaries(chunk_summaries);
let lifecycle_action = None;
let expected = vec![
ChunkSummary::new_without_timestamps(
Arc::from("1970-01-01T00"),
Arc::from("cpu"),
2,
ChunkStorage::ReadBufferAndObjectStore,
ChunkSummary {
partition_key: Arc::from("1970-01-01T00"),
table_name: Arc::from("cpu"),
id: 2,
storage: ChunkStorage::ReadBufferAndObjectStore,
lifecycle_action,
3332, // size of RB and OS chunks
1523, // size of parquet file
2,
),
ChunkSummary::new_without_timestamps(
Arc::from("1970-01-05T15"),
Arc::from("cpu"),
0,
ChunkStorage::ClosedMutableBuffer,
memory_bytes: 3332, // size of RB and OS chunks
object_store_bytes: 1523, // size of parquet file
row_count: 2,
time_of_last_access: None,
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(1),
time_closed: None,
},
ChunkSummary {
partition_key: Arc::from("1970-01-05T15"),
table_name: Arc::from("cpu"),
id: 0,
storage: ChunkStorage::ClosedMutableBuffer,
lifecycle_action,
2510,
0, // no OS chunks
1,
),
ChunkSummary::new_without_timestamps(
Arc::from("1970-01-05T15"),
Arc::from("cpu"),
1,
ChunkStorage::OpenMutableBuffer,
memory_bytes: 2510,
object_store_bytes: 0, // no OS chunks
row_count: 1,
time_of_last_access: None,
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(1),
time_closed: None,
},
ChunkSummary {
partition_key: Arc::from("1970-01-05T15"),
table_name: Arc::from("cpu"),
id: 1,
storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action,
87,
0, // no OS chunks
1,
),
memory_bytes: 87,
object_store_bytes: 0, // no OS chunks
row_count: 1,
time_of_last_access: None,
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(1),
time_closed: None,
},
];
for (expected_summary, actual_summary) in expected.iter().zip(chunk_summaries.iter()) {
assert_eq!(
expected_summary, actual_summary,
assert!(
expected_summary.equal_without_timestamps(&actual_summary),
"\n\nexpected item:\n{:#?}\n\nactual item:\n{:#?}\n\n\
all expected:\n{:#?}\n\nall actual:\n{:#?}",
expected_summary, actual_summary, expected, chunk_summaries
expected_summary,
actual_summary,
expected,
chunk_summaries
);
}

View File

@ -1,16 +1,15 @@
use crate::db::catalog::metrics::{StorageGauge, TimestampHistogram};
use chrono::{DateTime, Utc};
use data_types::instant::to_approximate_datetime;
use data_types::write_summary::TimestampSummary;
use data_types::{
chunk_metadata::{
ChunkAddr, ChunkColumnSummary, ChunkLifecycleAction, ChunkStorage, ChunkSummary,
DetailedChunkSummary,
},
instant::to_approximate_datetime,
partition_metadata::TableSummary,
write_summary::TimestampSummary,
};
use internal_types::access::AccessRecorder;
use internal_types::schema::Schema;
use internal_types::{access::AccessRecorder, schema::Schema};
use metrics::{Counter, Histogram, KeyValue};
use mutable_buffer::chunk::{snapshot::ChunkSnapshot as MBChunkSnapshot, MBChunk};
use observability_deps::tracing::debug;
@ -201,12 +200,12 @@ pub struct CatalogChunk {
/// The earliest time at which data contained within this chunk was written
/// into IOx. Note due to the compaction, etc... this may not be the chunk
/// that data was originally written into
time_of_first_write: Option<DateTime<Utc>>,
time_of_first_write: DateTime<Utc>,
/// The latest time at which data contained within this chunk was written
/// into IOx. Note due to the compaction, etc... this may not be the chunk
/// that data was originally written into
time_of_last_write: Option<DateTime<Utc>>,
time_of_last_write: DateTime<Utc>,
/// Time at which this chunk was marked as closed. Note this is
/// not the same as the timestamps on the data itself
@ -270,8 +269,9 @@ impl CatalogChunk {
) -> Self {
assert_eq!(chunk.table_name(), &addr.table_name);
let first_write = chunk.table_summary().time_of_first_write;
let last_write = chunk.table_summary().time_of_last_write;
let summary = chunk.table_summary();
let time_of_first_write = summary.time_of_first_write;
let time_of_last_write = summary.time_of_last_write;
let stage = ChunkStage::Open { mb_chunk: chunk };
@ -285,8 +285,8 @@ impl CatalogChunk {
lifecycle_action: None,
metrics,
access_recorder: Default::default(),
time_of_first_write: Some(first_write),
time_of_last_write: Some(last_write),
time_of_first_write,
time_of_last_write,
time_closed: None,
};
chunk.update_metrics();
@ -303,8 +303,8 @@ impl CatalogChunk {
metrics: ChunkMetrics,
) -> Self {
let summary = chunk.table_summary();
let first_write = summary.time_of_first_write;
let last_write = summary.time_of_last_write;
let time_of_first_write = summary.time_of_first_write;
let time_of_last_write = summary.time_of_last_write;
let stage = ChunkStage::Frozen {
meta: Arc::new(ChunkMetadata {
@ -324,8 +324,8 @@ impl CatalogChunk {
lifecycle_action: None,
metrics,
access_recorder: Default::default(),
time_of_first_write: Some(first_write),
time_of_last_write: Some(last_write),
time_of_first_write,
time_of_last_write,
time_closed: None,
};
chunk.update_metrics();
@ -342,8 +342,8 @@ impl CatalogChunk {
assert_eq!(chunk.table_name(), addr.table_name.as_ref());
let summary = chunk.table_summary();
let first_write = summary.time_of_first_write;
let last_write = summary.time_of_last_write;
let time_of_first_write = summary.time_of_first_write;
let time_of_last_write = summary.time_of_last_write;
// this is temporary
let table_summary = TableSummary {
@ -369,8 +369,8 @@ impl CatalogChunk {
lifecycle_action: None,
metrics,
access_recorder: Default::default(),
time_of_first_write: Some(first_write),
time_of_last_write: Some(last_write),
time_of_first_write,
time_of_last_write,
time_closed: None,
};
chunk.update_metrics();
@ -412,11 +412,11 @@ impl CatalogChunk {
.map_or(false, |action| action.metadata() == &lifecycle_action)
}
pub fn time_of_first_write(&self) -> Option<DateTime<Utc>> {
pub fn time_of_first_write(&self) -> DateTime<Utc> {
self.time_of_first_write
}
pub fn time_of_last_write(&self) -> Option<DateTime<Utc>> {
pub fn time_of_last_write(&self) -> DateTime<Utc> {
self.time_of_last_write
}
@ -479,18 +479,10 @@ impl CatalogChunk {
self.metrics.timestamp_histogram.add(timestamps);
self.access_recorder.record_access_now();
if let Some(t) = self.time_of_first_write {
self.time_of_first_write = Some(t.min(time_of_write))
} else {
self.time_of_first_write = Some(time_of_write)
}
self.time_of_first_write = self.time_of_first_write.min(time_of_write);
// DateTime<Utc> isn't necessarily monotonic
if let Some(t) = self.time_of_last_write {
self.time_of_last_write = Some(t.max(time_of_write))
} else {
self.time_of_last_write = Some(time_of_write)
}
self.time_of_last_write = self.time_of_last_write.max(time_of_write);
self.update_metrics();
}
@ -969,6 +961,7 @@ impl CatalogChunk {
#[cfg(test)]
mod tests {
use super::*;
use entry::test_helpers::lp_to_entry;
use mutable_buffer::chunk::{ChunkMetrics as MBChunkMetrics, MBChunk};
use parquet_file::{
@ -976,8 +969,6 @@ mod tests {
test_utils::{make_chunk as make_parquet_chunk_with_store, make_object_store},
};
use super::*;
#[test]
fn test_new_open() {
let addr = chunk_addr();
@ -986,8 +977,6 @@ mod tests {
let mb_chunk = make_mb_chunk(&addr.table_name);
let chunk = CatalogChunk::new_open(addr, mb_chunk, ChunkMetrics::new_unregistered());
assert!(matches!(chunk.stage(), &ChunkStage::Open { .. }));
assert!(chunk.time_of_first_write.is_some());
assert!(chunk.time_of_last_write.is_some());
}
#[tokio::test]

View File

@ -5,6 +5,14 @@ use parking_lot::Mutex;
use std::sync::Arc;
use tracker::{LockTracker, RwLock};
const TIMESTAMP_METRICS_ENABLE_ENV: &str = "INFLUXDB_IOX_ROW_TIMESTAMP_METRICS";
fn report_timestamp_metrics(table_name: &str) -> bool {
std::env::var(TIMESTAMP_METRICS_ENABLE_ENV)
.ok()
.map(|x| x.split(',').any(|x| x == table_name))
.unwrap_or(false)
}
#[derive(Debug)]
pub struct CatalogMetrics {
/// Metrics domain
@ -79,11 +87,13 @@ impl CatalogMetrics {
);
let timestamp_histogram = Default::default();
self.metrics_domain.register_observer(
None,
&[KeyValue::new("table", table_name.to_string())],
&timestamp_histogram,
);
if report_timestamp_metrics(table_name) {
self.metrics_domain.register_observer(
None,
&[KeyValue::new("table", table_name.to_string())],
&timestamp_histogram,
);
}
TableMetrics {
metrics_domain: Arc::clone(&self.metrics_domain),

View File

@ -5,8 +5,8 @@ use chrono::{DateTime, Utc};
use data_types::partition_metadata;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::MemoryStream;
use internal_types::access::AccessRecorder;
use internal_types::{
access::AccessRecorder,
schema::{sort::SortKey, Schema},
selection::Selection,
};
@ -612,8 +612,8 @@ mod tests {
let chunk = chunks.into_iter().next().unwrap();
let chunk = chunk.read();
assert_eq!(chunk.storage().1, ChunkStorage::ObjectStoreOnly);
let first_write = chunk.time_of_first_write().unwrap();
let last_write = chunk.time_of_last_write().unwrap();
let first_write = chunk.time_of_first_write();
let last_write = chunk.time_of_last_write();
assert_eq!(first_write, last_write);
assert!(before_creation < first_write);
assert!(last_write < after_creation);

View File

@ -1,20 +1,23 @@
use std::fmt::Display;
use std::sync::Arc;
use std::time::Instant;
use chrono::{DateTime, TimeZone, Utc};
use super::DbChunk;
use crate::{
db::catalog::{chunk::CatalogChunk, partition::Partition},
Db,
};
use ::lifecycle::LifecycleDb;
use data_types::chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkStorage};
use data_types::database_rules::LifecycleRules;
use data_types::error::ErrorLogger;
use data_types::job::Job;
use data_types::partition_metadata::Statistics;
use data_types::DatabaseName;
use chrono::{DateTime, TimeZone, Utc};
use data_types::{
chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkStorage},
database_rules::LifecycleRules,
error::ErrorLogger,
job::Job,
partition_metadata::Statistics,
DatabaseName,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use internal_types::access::AccessMetrics;
use internal_types::schema::merge::SchemaMerger;
use internal_types::schema::{Schema, TIME_COLUMN_NAME};
use internal_types::{
access::AccessMetrics,
schema::{merge::SchemaMerger, Schema, TIME_COLUMN_NAME},
};
use lifecycle::{
LifecycleChunk, LifecyclePartition, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk,
LockablePartition,
@ -22,12 +25,9 @@ use lifecycle::{
use observability_deps::tracing::{info, trace};
use persistence_windows::persistence_windows::FlushHandle;
use query::QueryChunkMeta;
use std::{fmt::Display, sync::Arc, time::Instant};
use tracker::{RwLock, TaskTracker};
use crate::db::catalog::chunk::CatalogChunk;
use crate::db::catalog::partition::Partition;
use crate::Db;
pub(crate) use compact::compact_chunks;
pub(crate) use drop::drop_chunk;
pub(crate) use error::{Error, Result};
@ -35,8 +35,6 @@ pub(crate) use move_chunk::move_chunk_to_read_buffer;
pub(crate) use persist::persist_chunks;
pub(crate) use unload::unload_read_buffer_chunk;
use super::DbChunk;
mod compact;
mod drop;
mod error;
@ -312,7 +310,7 @@ impl LifecycleChunk for CatalogChunk {
self.access_recorder().get_metrics()
}
fn time_of_last_write(&self) -> Option<DateTime<Utc>> {
fn time_of_last_write(&self) -> DateTime<Utc> {
self.time_of_last_write()
}

View File

@ -48,17 +48,15 @@ pub(crate) fn compact_chunks(
input_rows += chunk.table_summary().total_count();
time_of_first_write = match (time_of_first_write, chunk.time_of_first_write()) {
(Some(prev_first), Some(candidate_first)) => Some(prev_first.min(candidate_first)),
(Some(only), None) | (None, Some(only)) => Some(only),
(None, None) => None,
};
let candidate_first = chunk.time_of_first_write();
time_of_first_write = time_of_first_write
.map(|prev_first| prev_first.min(candidate_first))
.or(Some(candidate_first));
time_of_last_write = match (time_of_last_write, chunk.time_of_last_write()) {
(Some(prev_last), Some(candidate_last)) => Some(prev_last.max(candidate_last)),
(Some(only), None) | (None, Some(only)) => Some(only),
(None, None) => None,
};
let candidate_last = chunk.time_of_last_write();
time_of_last_write = time_of_last_write
.map(|prev_last| prev_last.max(candidate_last))
.or(Some(candidate_last));
chunk.set_compacting(&registration)?;
Ok(DbChunk::snapshot(&*chunk))
@ -179,15 +177,15 @@ mod tests {
chunk_summaries.sort_unstable();
let mub_summary = &chunk_summaries[0];
let first_mub_write = mub_summary.time_of_first_write.unwrap();
let last_mub_write = mub_summary.time_of_last_write.unwrap();
let first_mub_write = mub_summary.time_of_first_write;
let last_mub_write = mub_summary.time_of_last_write;
assert!(time2 < first_mub_write);
assert_eq!(first_mub_write, last_mub_write);
assert!(first_mub_write < time3);
let rub_summary = &chunk_summaries[1];
let first_rub_write = rub_summary.time_of_first_write.unwrap();
let last_rub_write = rub_summary.time_of_last_write.unwrap();
let first_rub_write = rub_summary.time_of_first_write;
let last_rub_write = rub_summary.time_of_last_write;
assert!(time0 < first_rub_write);
assert!(first_rub_write < last_rub_write);
assert!(last_rub_write < time1);

View File

@ -54,17 +54,15 @@ pub fn persist_chunks(
input_rows += chunk.table_summary().total_count();
time_of_first_write = match (time_of_first_write, chunk.time_of_first_write()) {
(Some(prev_first), Some(candidate_first)) => Some(prev_first.min(candidate_first)),
(Some(only), None) | (None, Some(only)) => Some(only),
(None, None) => None,
};
let candidate_first = chunk.time_of_first_write();
time_of_first_write = time_of_first_write
.map(|prev_first| prev_first.min(candidate_first))
.or(Some(candidate_first));
time_of_last_write = match (time_of_last_write, chunk.time_of_last_write()) {
(Some(prev_last), Some(candidate_last)) => Some(prev_last.max(candidate_last)),
(Some(only), None) | (None, Some(only)) => Some(only),
(None, None) => None,
};
let candidate_last = chunk.time_of_last_write();
time_of_last_write = time_of_last_write
.map(|prev_last| prev_last.max(candidate_last))
.or(Some(candidate_last));
chunk.set_writing_to_object_store(&registration)?;
query_chunks.push(DbChunk::snapshot(&*chunk));
@ -168,19 +166,16 @@ pub fn persist_chunks(
#[cfg(test)]
mod tests {
use std::num::{NonZeroU32, NonZeroU64};
use std::time::Instant;
use super::*;
use crate::{db::test_helpers::write_lp, utils::TestDb};
use chrono::{TimeZone, Utc};
use data_types::database_rules::LifecycleRules;
use lifecycle::{LockableChunk, LockablePartition};
use query::QueryDatabase;
use crate::db::test_helpers::write_lp;
use crate::utils::TestDb;
use super::*;
use std::{
num::{NonZeroU32, NonZeroU64},
time::Instant,
};
#[tokio::test]
async fn test_flush_overlapping() {

View File

@ -7,26 +7,20 @@
//!
//! For example `SELECT * FROM system.chunks`
use std::any::Any;
use std::sync::Arc;
use super::catalog::Catalog;
use crate::JobRegistry;
use arrow::{
datatypes::{Field, Schema, SchemaRef},
error::Result,
record_batch::RecordBatch,
};
use chrono::{DateTime, Utc};
use datafusion::{
catalog::schema::SchemaProvider,
datasource::{datasource::Statistics, TableProvider},
error::{DataFusionError, Result as DataFusionResult},
physical_plan::{memory::MemoryExec, ExecutionPlan},
};
use crate::JobRegistry;
use super::catalog::Catalog;
use std::{any::Any, sync::Arc};
mod chunks;
mod columns;
@ -158,11 +152,6 @@ where
}
}
// TODO: Use a custom proc macro or serde to reduce the boilerplate
fn time_to_ts(time: Option<DateTime<Utc>>) -> Option<i64> {
time.map(|ts| ts.timestamp_nanos())
}
/// Creates a DataFusion ExecutionPlan node that scans a single batch
/// of records.
fn scan_batch(
@ -205,11 +194,10 @@ fn scan_batch(
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{ArrayRef, UInt64Array};
use arrow_util::assert_batches_eq;
use super::*;
fn seq_array(start: u64, end: u64) -> ArrayRef {
Arc::new(UInt64Array::from_iter_values(start..end))
}

View File

@ -1,16 +1,14 @@
use crate::db::{catalog::Catalog, system_tables::IoxSystemTable};
use arrow::{
array::{StringArray, TimestampNanosecondArray, UInt32Array, UInt64Array},
datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit},
error::Result,
record_batch::RecordBatch,
};
use chrono::{DateTime, Utc};
use data_types::{chunk_metadata::ChunkSummary, error::ErrorLogger};
use std::sync::Arc;
use arrow::array::{StringArray, TimestampNanosecondArray, UInt32Array, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow::error::Result;
use arrow::record_batch::RecordBatch;
use data_types::chunk_metadata::ChunkSummary;
use data_types::error::ErrorLogger;
use crate::db::catalog::Catalog;
use crate::db::system_tables::{time_to_ts, IoxSystemTable};
/// Implementation of system.chunks table
#[derive(Debug)]
pub(super) struct ChunksTable {
@ -50,12 +48,21 @@ fn chunk_summaries_schema() -> SchemaRef {
Field::new("object_store_bytes", DataType::UInt64, false),
Field::new("row_count", DataType::UInt64, false),
Field::new("time_of_last_access", ts.clone(), true),
Field::new("time_of_first_write", ts.clone(), true),
Field::new("time_of_last_write", ts.clone(), true),
Field::new("time_of_first_write", ts.clone(), false),
Field::new("time_of_last_write", ts.clone(), false),
Field::new("time_closed", ts, true),
]))
}
// TODO: Use a custom proc macro or serde to reduce the boilerplate
fn optional_time_to_ts(time: Option<DateTime<Utc>>) -> Option<i64> {
time.and_then(time_to_ts)
}
fn time_to_ts(ts: DateTime<Utc>) -> Option<i64> {
Some(ts.timestamp_nanos())
}
fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<RecordBatch> {
let id = chunks.iter().map(|c| Some(c.id)).collect::<UInt32Array>();
let partition_key = chunks
@ -89,7 +96,7 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<
let time_of_last_access = chunks
.iter()
.map(|c| c.time_of_last_access)
.map(time_to_ts)
.map(optional_time_to_ts)
.collect::<TimestampNanosecondArray>();
let time_of_first_write = chunks
.iter()
@ -104,7 +111,7 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<
let time_closed = chunks
.iter()
.map(|c| c.time_closed)
.map(time_to_ts)
.map(optional_time_to_ts)
.collect::<TimestampNanosecondArray>();
RecordBatch::try_new(
@ -128,12 +135,10 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<
#[cfg(test)]
mod tests {
use chrono::{TimeZone, Utc};
use arrow_util::assert_batches_eq;
use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkStorage};
use super::*;
use arrow_util::assert_batches_eq;
use chrono::{TimeZone, Utc};
use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkStorage};
#[test]
fn test_from_chunk_summaries() {
@ -148,8 +153,8 @@ mod tests {
object_store_bytes: 0,
row_count: 11,
time_of_last_access: None,
time_of_first_write: Some(Utc.timestamp_nanos(10_000_000_000)),
time_of_last_write: None,
time_of_first_write: Utc.timestamp_nanos(10_000_000_000),
time_of_last_write: Utc.timestamp_nanos(10_000_000_000),
time_closed: None,
},
ChunkSummary {
@ -162,8 +167,8 @@ mod tests {
object_store_bytes: 0,
row_count: 22,
time_of_last_access: Some(Utc.timestamp_nanos(754_000_000_000)),
time_of_first_write: None,
time_of_last_write: Some(Utc.timestamp_nanos(80_000_000_000)),
time_of_first_write: Utc.timestamp_nanos(80_000_000_000),
time_of_last_write: Utc.timestamp_nanos(80_000_000_000),
time_closed: None,
},
ChunkSummary {
@ -176,8 +181,8 @@ mod tests {
object_store_bytes: 5678,
row_count: 33,
time_of_last_access: Some(Utc.timestamp_nanos(5_000_000_000)),
time_of_first_write: Some(Utc.timestamp_nanos(100_000_000_000)),
time_of_last_write: Some(Utc.timestamp_nanos(200_000_000_000)),
time_of_first_write: Utc.timestamp_nanos(100_000_000_000),
time_of_last_write: Utc.timestamp_nanos(200_000_000_000),
time_closed: None,
},
];
@ -186,8 +191,8 @@ mod tests {
"+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------------+",
"| id | partition_key | table_name | storage | lifecycle_action | memory_bytes | object_store_bytes | row_count | time_of_last_access | time_of_first_write | time_of_last_write | time_closed |",
"+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------------+",
"| 0 | p1 | table1 | OpenMutableBuffer | | 23754 | | 11 | | 1970-01-01T00:00:10Z | | |",
"| 1 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23455 | | 22 | 1970-01-01T00:12:34Z | | 1970-01-01T00:01:20Z | |",
"| 0 | p1 | table1 | OpenMutableBuffer | | 23754 | | 11 | | 1970-01-01T00:00:10Z | 1970-01-01T00:00:10Z | |",
"| 1 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23455 | | 22 | 1970-01-01T00:12:34Z | 1970-01-01T00:01:20Z | 1970-01-01T00:01:20Z | |",
"| 2 | p1 | table1 | ObjectStoreOnly | | 1234 | 5678 | 33 | 1970-01-01T00:00:05Z | 1970-01-01T00:01:40Z | 1970-01-01T00:03:20Z | |",
"+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------------+",
];

View File

@ -1,17 +1,16 @@
use std::collections::HashMap;
use std::sync::Arc;
use arrow::array::{ArrayRef, StringBuilder, UInt32Builder, UInt64Builder};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::error::Result;
use arrow::record_batch::RecordBatch;
use data_types::chunk_metadata::DetailedChunkSummary;
use data_types::error::ErrorLogger;
use data_types::partition_metadata::{PartitionSummary, TableSummary};
use crate::db::catalog::Catalog;
use crate::db::system_tables::IoxSystemTable;
use crate::db::{catalog::Catalog, system_tables::IoxSystemTable};
use arrow::{
array::{ArrayRef, StringBuilder, UInt32Builder, UInt64Builder},
datatypes::{DataType, Field, Schema, SchemaRef},
error::Result,
record_batch::RecordBatch,
};
use data_types::{
chunk_metadata::DetailedChunkSummary,
error::ErrorLogger,
partition_metadata::{PartitionSummary, TableSummary},
};
use std::{collections::HashMap, sync::Arc};
/// Implementation of `system.columns` system table
#[derive(Debug)]
@ -217,11 +216,13 @@ fn assemble_chunk_columns(
#[cfg(test)]
mod tests {
use arrow_util::assert_batches_eq;
use data_types::chunk_metadata::{ChunkColumnSummary, ChunkStorage, ChunkSummary};
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics};
use super::*;
use arrow_util::assert_batches_eq;
use chrono::{TimeZone, Utc};
use data_types::{
chunk_metadata::{ChunkColumnSummary, ChunkStorage, ChunkSummary},
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics},
};
#[test]
fn test_from_partition_summaries() {
@ -317,8 +318,8 @@ mod tests {
object_store_bytes: 0,
row_count: 11,
time_of_last_access: None,
time_of_first_write: None,
time_of_last_write: None,
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(2),
time_closed: None,
},
columns: vec![
@ -353,8 +354,8 @@ mod tests {
object_store_bytes: 0,
row_count: 11,
time_of_last_access: None,
time_of_first_write: None,
time_of_last_write: None,
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(2),
time_closed: None,
},
columns: vec![ChunkColumnSummary {
@ -383,8 +384,8 @@ mod tests {
object_store_bytes: 0,
row_count: 11,
time_of_last_access: None,
time_of_first_write: None,
time_of_last_write: None,
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(2),
time_closed: None,
},
columns: vec![ChunkColumnSummary {

View File

@ -106,7 +106,7 @@ impl ServerFixture {
Some(server) => server,
None => {
// if not, create one
let server = TestServer::new();
let server = TestServer::new(Default::default());
let server = Arc::new(server);
// ensure the server is ready
@ -125,10 +125,17 @@ impl ServerFixture {
/// Create a new server fixture and wait for it to be ready. This
/// is called "create" rather than new because it is async and
/// waits. The database is left unconfigured (no writer id) and
/// waits. The database is left unconfigured (no writer id) and
/// is not shared with any other tests.
pub async fn create_single_use() -> Self {
let server = TestServer::new();
Self::create_single_use_with_env(Default::default()).await
}
/// Create a new server fixture with the provided additional environment variables
/// and wait for it to be ready. The database is left unconfigured (no writer id)
/// and is not shared with any other tests.
pub async fn create_single_use_with_env(env: Vec<(String, String)>) -> Self {
let server = TestServer::new(env);
let server = Arc::new(server);
// ensure the server is ready
@ -256,9 +263,12 @@ struct TestServer {
/// Which ports this server should use
addrs: BindAddresses,
// The temporary directory **must** be last so that it is
// dropped after the database closes.
/// The temporary directory **must** be last so that it is
/// dropped after the database closes.
dir: TempDir,
/// Additional environment variables
env: Vec<(String, String)>,
}
struct Process {
@ -267,19 +277,20 @@ struct Process {
}
impl TestServer {
fn new() -> Self {
fn new(env: Vec<(String, String)>) -> Self {
let addrs = BindAddresses::default();
let ready = Mutex::new(ServerState::Started);
let dir = test_helpers::tmp_dir().unwrap();
let server_process = Mutex::new(Self::create_server_process(&addrs, &dir));
let server_process = Mutex::new(Self::create_server_process(&addrs, &dir, &env));
Self {
ready,
server_process,
addrs,
dir,
env,
}
}
@ -288,11 +299,15 @@ impl TestServer {
let mut server_process = self.server_process.lock().await;
server_process.child.kill().unwrap();
server_process.child.wait().unwrap();
*server_process = Self::create_server_process(&self.addrs, &self.dir);
*server_process = Self::create_server_process(&self.addrs, &self.dir, &self.env);
*ready_guard = ServerState::Started;
}
fn create_server_process(addrs: &BindAddresses, dir: &TempDir) -> Process {
fn create_server_process(
addrs: &BindAddresses,
dir: &TempDir,
env: &[(String, String)],
) -> Process {
// Create a new file each time and keep it around to aid debugging
let (log_file, log_path) = NamedTempFile::new()
.expect("opening log file")
@ -325,6 +340,7 @@ impl TestServer {
.env("INFLUXDB_IOX_DB_DIR", dir.path())
.env("INFLUXDB_IOX_BIND_ADDR", addrs.http_bind_addr())
.env("INFLUXDB_IOX_GRPC_BIND_ADDR", addrs.grpc_bind_addr())
.envs(env.iter().map(|(a, b)| (a.as_str(), b.as_str())))
// redirect output to log file
.stdout(stdout_log_file)
.stderr(stderr_log_file)
@ -442,7 +458,7 @@ impl TestServer {
};
}
/// Create a connection channel for the gRPR endpoing
/// Create a connection channel for the gRPC endpoint
async fn grpc_channel(
&self,
) -> influxdb_iox_client::connection::Result<tonic::transport::Channel> {

View File

@ -0,0 +1,39 @@
use crate::common::server_fixture::ServerFixture;
use crate::end_to_end_cases::scenario::Scenario;
#[tokio::test]
pub async fn test_row_timestamp() {
let env = vec![(
"INFLUXDB_IOX_ROW_TIMESTAMP_METRICS".to_string(),
"system".to_string(),
)];
let server_fixture = ServerFixture::create_single_use_with_env(env).await;
let mut management_client = server_fixture.management_client();
management_client.update_server_id(1).await.unwrap();
server_fixture.wait_server_initialized().await;
let scenario = Scenario::new();
scenario.create_database(&mut management_client).await;
scenario.load_data(&server_fixture.influxdb2_client()).await;
let client = reqwest::Client::new();
let url = format!("{}/metrics", server_fixture.http_base());
let payload = client.get(&url).send().await.unwrap().text().await.unwrap();
let lines: Vec<_> = payload
.trim()
.split('\n')
.filter(|x| x.starts_with("catalog_row_time_seconds_bucket"))
.collect();
let db_name = format!("{}_{}", scenario.org_id_str(), scenario.bucket_id_str());
let db_name_label = format!("db_name=\"{}\"", db_name);
// Should only be enabled for the system table
assert_eq!(lines.len(), 60);
assert!(lines
.iter()
.all(|x| x.contains("table=\"system\"") && x.contains(&db_name_label)));
}

View File

@ -3,6 +3,7 @@ mod freeze;
mod http;
mod management_api;
mod management_cli;
mod metrics;
mod operations_api;
mod operations_cli;
mod persistence;