From f35a49edd01d00b0513e125425f9118cb36dfaed Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Mon, 11 Oct 2021 10:23:00 +0100 Subject: [PATCH] refactor: move Sequence to data_types (#2780) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 1 - data_types/src/lib.rs | 1 + data_types/src/sequence.rs | 14 ++++++++++++++ entry/src/entry.rs | 18 ++---------------- persistence_windows/Cargo.toml | 1 - persistence_windows/src/persistence_windows.rs | 5 +++-- server/src/database.rs | 3 ++- server/src/db.rs | 3 ++- server/src/db/replay.rs | 6 ++++-- server/src/write_buffer.rs | 2 +- write_buffer/src/core.rs | 3 ++- write_buffer/src/kafka.rs | 6 ++++-- write_buffer/src/mock.rs | 3 ++- 13 files changed, 37 insertions(+), 29 deletions(-) create mode 100644 data_types/src/sequence.rs diff --git a/Cargo.lock b/Cargo.lock index 75ee5a16b9..0ab6573722 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2802,7 +2802,6 @@ version = "0.1.0" dependencies = [ "chrono", "data_types", - "entry", "internal_types", "observability_deps", "snafu", diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index beb03ce295..12a9d294ff 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -19,6 +19,7 @@ pub mod error; pub mod job; pub mod names; pub mod partition_metadata; +pub mod sequence; pub mod server_id; pub mod timestamp; pub mod write_summary; diff --git a/data_types/src/sequence.rs b/data_types/src/sequence.rs new file mode 100644 index 0000000000..923c92c601 --- /dev/null +++ b/data_types/src/sequence.rs @@ -0,0 +1,14 @@ +#[derive(Debug, Copy, Clone)] +pub struct Sequence { + pub id: u32, + pub number: u64, +} + +impl Sequence { + pub fn new(sequencer_id: u32, sequence_number: u64) -> Self { + Self { + id: sequencer_id, + number: sequence_number, + } + } +} diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 5c515f0a15..45683a1082 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -9,6 +9,8 @@ use ouroboros::self_referencing; use snafu::{OptionExt, ResultExt, Snafu}; use data_types::database_rules::{Error as DataError, Partitioner, ShardId, Sharder}; +use data_types::sequence::Sequence; +use data_types::write_summary::TimestampSummary; use generated_types::influxdata::pbdata::v1 as pb; use influxdb_line_protocol::{FieldValue, ParsedLine}; use internal_types::schema::{ @@ -17,7 +19,6 @@ use internal_types::schema::{ }; use crate::entry_fb; -use data_types::write_summary::TimestampSummary; #[derive(Debug, Snafu)] pub enum Error { @@ -1750,21 +1751,6 @@ pub struct SequencedEntry { sequence_and_producer_ts: Option<(Sequence, DateTime)>, } -#[derive(Debug, Copy, Clone)] -pub struct Sequence { - pub id: u32, - pub number: u64, -} - -impl Sequence { - pub fn new(sequencer_id: u32, sequence_number: u64) -> Self { - Self { - id: sequencer_id, - number: sequence_number, - } - } -} - impl SequencedEntry { pub fn new_from_sequence( sequence: Sequence, diff --git a/persistence_windows/Cargo.toml b/persistence_windows/Cargo.toml index 8384954bd6..8da9219ce0 100644 --- a/persistence_windows/Cargo.toml +++ b/persistence_windows/Cargo.toml @@ -6,7 +6,6 @@ edition = "2018" [dependencies] chrono = "0.4" data_types = { path = "../data_types" } -entry = { path = "../entry" } internal_types = { path = "../internal_types" } observability_deps = { path = "../observability_deps" } snafu = "0.6.2" diff --git a/persistence_windows/src/persistence_windows.rs b/persistence_windows/src/persistence_windows.rs index 4a6f403a4d..cc236e4de5 100644 --- a/persistence_windows/src/persistence_windows.rs +++ b/persistence_windows/src/persistence_windows.rs @@ -9,8 +9,9 @@ use std::{ use chrono::{DateTime, Duration, Utc}; -use data_types::{partition_metadata::PartitionAddr, write_summary::WriteSummary}; -use entry::Sequence; +use data_types::{ + partition_metadata::PartitionAddr, sequence::Sequence, write_summary::WriteSummary, +}; use internal_types::freezable::{Freezable, FreezeHandle}; use crate::min_max_sequence::MinMaxSequence; diff --git a/server/src/database.rs b/server/src/database.rs index 018a20b4c7..d0543d71fe 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -1194,7 +1194,8 @@ mod tests { use data_types::database_rules::{ PartitionTemplate, TemplatePart, WriteBufferConnection, WriteBufferDirection, }; - use entry::{test_helpers::lp_to_entries, Sequence, SequencedEntry}; + use data_types::sequence::Sequence; + use entry::{test_helpers::lp_to_entries, SequencedEntry}; use object_store::ObjectStore; use std::{ convert::{TryFrom, TryInto}, diff --git a/server/src/db.rs b/server/src/db.rs index 830029da1c..95ccf07e71 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -24,10 +24,11 @@ use data_types::{ chunk_metadata::{ChunkId, ChunkLifecycleAction, ChunkOrder, ChunkSummary}, database_rules::DatabaseRules, partition_metadata::{PartitionSummary, TableSummary}, + sequence::Sequence, server_id::ServerId, }; use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider}; -use entry::{Entry, Sequence, SequencedEntry, TableBatch}; +use entry::{Entry, SequencedEntry, TableBatch}; use internal_types::schema::Schema; use iox_object_store::IoxObjectStore; use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk}; diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs index deb69fbd80..e5e8353237 100644 --- a/server/src/db/replay.rs +++ b/server/src/db/replay.rs @@ -5,7 +5,8 @@ use std::{ }; use chrono::Utc; -use entry::{Sequence, TableBatch}; +use data_types::sequence::Sequence; +use entry::TableBatch; use futures::TryStreamExt; use observability_deps::tracing::info; use persistence_windows::{ @@ -420,11 +421,12 @@ mod tests { use chrono::{DateTime, Utc}; use data_types::{ database_rules::{PartitionTemplate, Partitioner, TemplatePart}, + sequence::Sequence, server_id::ServerId, }; use entry::{ test_helpers::{lp_to_entries, lp_to_entry}, - Sequence, SequencedEntry, + SequencedEntry, }; use object_store::ObjectStore; use persistence_windows::{ diff --git a/server/src/write_buffer.rs b/server/src/write_buffer.rs index 914ea0f283..7f41afbbfb 100644 --- a/server/src/write_buffer.rs +++ b/server/src/write_buffer.rs @@ -201,8 +201,8 @@ mod tests { use ::test_helpers::assert_contains; use arrow_util::assert_batches_eq; use data_types::database_rules::{PartitionTemplate, TemplatePart}; + use data_types::sequence::Sequence; use entry::test_helpers::lp_to_entry; - use entry::Sequence; use persistence_windows::min_max_sequence::MinMaxSequence; use query::exec::ExecutionContextProvider; use query::frontend::sql::SqlQueryPlanner; diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index 8dbabf628b..55e1da6e99 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -2,7 +2,8 @@ use std::fmt::Debug; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use entry::{Entry, Sequence, SequencedEntry}; +use data_types::sequence::Sequence; +use entry::{Entry, SequencedEntry}; use futures::{future::BoxFuture, stream::BoxStream}; /// Generic boxed error type that is used in this crate. diff --git a/write_buffer/src/kafka.rs b/write_buffer/src/kafka.rs index 8a55990e03..b5de77f1ae 100644 --- a/write_buffer/src/kafka.rs +++ b/write_buffer/src/kafka.rs @@ -8,8 +8,10 @@ use std::{ use async_trait::async_trait; use chrono::{DateTime, TimeZone, Utc}; -use data_types::{database_rules::WriteBufferCreationConfig, server_id::ServerId}; -use entry::{Entry, Sequence, SequencedEntry}; +use data_types::{ + database_rules::WriteBufferCreationConfig, sequence::Sequence, server_id::ServerId, +}; +use entry::{Entry, SequencedEntry}; use futures::{FutureExt, StreamExt}; use observability_deps::tracing::{debug, info}; use rdkafka::{ diff --git a/write_buffer/src/mock.rs b/write_buffer/src/mock.rs index a54130898f..9d96ab35b8 100644 --- a/write_buffer/src/mock.rs +++ b/write_buffer/src/mock.rs @@ -11,7 +11,8 @@ use futures::{stream, FutureExt, StreamExt}; use parking_lot::Mutex; use data_types::database_rules::WriteBufferCreationConfig; -use entry::{Entry, Sequence, SequencedEntry}; +use data_types::sequence::Sequence; +use entry::{Entry, SequencedEntry}; use crate::core::{ EntryStream, FetchHighWatermark, FetchHighWatermarkFut, WriteBufferError, WriteBufferReading,