fix: Move Sequence type to data_types2

pull/24376/head
Carol (Nichols || Goulding) 2022-05-05 10:33:10 -04:00
parent d2671355c3
commit 236edb9181
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
9 changed files with 53 additions and 66 deletions

View File

@ -17,7 +17,6 @@ pub mod error;
pub mod job;
pub mod partition_metadata;
pub mod router;
pub mod sequence;
pub mod server_id;
pub mod timestamp;
pub mod write_buffer;

View File

@ -1,16 +0,0 @@
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct Sequence {
/// The sequencer id (kafka partition id)
pub sequencer_id: u32,
/// The sequence number (kafka offset)
pub sequence_number: u64,
}
impl Sequence {
pub fn new(sequencer_id: u32, sequence_number: u64) -> Self {
Self {
sequencer_id,
sequence_number,
}
}
}

View File

@ -27,10 +27,7 @@ use std::{
};
use uuid::Uuid;
pub use data_types::{
sequence::Sequence,
timestamp::{TimestampMinMax, TimestampRange, MAX_NANO_TIME, MIN_NANO_TIME},
};
pub use data_types::timestamp::{TimestampMinMax, TimestampRange, MAX_NANO_TIME, MIN_NANO_TIME};
/// Unique ID for a `Namespace`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
@ -1923,6 +1920,25 @@ impl TableSummary {
}
}
/// Kafka partition ID plus offset
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct Sequence {
/// The sequencer id (kafka partition id)
pub sequencer_id: u32,
/// The sequence number (kafka offset)
pub sequence_number: u64,
}
impl Sequence {
/// Create a new Sequence
pub fn new(sequencer_id: u32, sequence_number: u64) -> Self {
Self {
sequencer_id,
sequence_number,
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -11,11 +11,8 @@
clippy::clone_on_ref_ptr
)]
use data_types::{
router::{ShardConfig, ShardId},
sequence::Sequence,
};
use data_types2::{DeletePredicate, NonEmptyString, StatValues, Statistics};
use data_types::router::{ShardConfig, ShardId};
use data_types2::{DeletePredicate, NonEmptyString, Sequence, StatValues, Statistics};
use hashbrown::HashMap;
use iox_time::Time;
use mutable_batch::MutableBatch;

View File

@ -108,6 +108,17 @@
//! [`rename(2)`]: https://man7.org/linux/man-pages/man2/rename.2.html
//! [`symlink(2)`]: https://man7.org/linux/man-pages/man2/symlink.2.html
//! [`unlink(2)`]: https://man7.org/linux/man-pages/man2/unlink.2.html
use crate::{
codec::{ContentType, IoxHeaders},
core::{WriteBufferError, WriteBufferReading, WriteBufferStreamHandler, WriteBufferWriting},
};
use async_trait::async_trait;
use data_types::write_buffer::WriteBufferCreationConfig;
use data_types2::Sequence;
use dml::{DmlMeta, DmlOperation};
use futures::{stream::BoxStream, Stream, StreamExt};
use iox_time::{Time, TimeProvider};
use pin_project::pin_project;
use std::{
collections::{BTreeMap, BTreeSet},
path::{Path, PathBuf},
@ -118,23 +129,10 @@ use std::{
Arc,
},
};
use crate::{
codec::{ContentType, IoxHeaders},
core::WriteBufferStreamHandler,
};
use async_trait::async_trait;
use data_types::{sequence::Sequence, write_buffer::WriteBufferCreationConfig};
use dml::{DmlMeta, DmlOperation};
use futures::{stream::BoxStream, Stream, StreamExt};
use iox_time::{Time, TimeProvider};
use pin_project::pin_project;
use tokio_util::sync::ReusableBoxFuture;
use trace::TraceCollector;
use uuid::Uuid;
use crate::core::{WriteBufferError, WriteBufferReading, WriteBufferWriting};
/// Header used to declare the creation time of the message.
pub const HEADER_TIME: &str = "last-modified";

View File

@ -1,6 +1,5 @@
use std::sync::Arc;
use data_types::sequence::Sequence;
use crate::codec::{ContentType, IoxHeaders};
use data_types2::Sequence;
use dml::{DmlMeta, DmlOperation, DmlWrite};
use hashbrown::{hash_map::Entry, HashMap};
use iox_time::{Time, TimeProvider};
@ -11,14 +10,13 @@ use rskafka::{
record::Record,
};
use schema::selection::Selection;
use std::sync::Arc;
use trace::{
ctx::SpanContext,
span::{Span, SpanRecorder},
TraceCollector,
};
use crate::codec::{ContentType, IoxHeaders};
/// Newtype wrapper for tags given back to the aggregator framework.
///
/// We cannot just use a simple `usize` to get the offsets from the produced records because we can have writes for

View File

@ -10,7 +10,8 @@ use crate::{
},
};
use async_trait::async_trait;
use data_types::{sequence::Sequence, write_buffer::WriteBufferCreationConfig};
use data_types::write_buffer::WriteBufferCreationConfig;
use data_types2::Sequence;
use dml::{DmlMeta, DmlOperation};
use futures::{stream::BoxStream, StreamExt};
use iox_time::{Time, TimeProvider};

View File

@ -1,3 +1,13 @@
use crate::core::{
WriteBufferError, WriteBufferReading, WriteBufferStreamHandler, WriteBufferWriting,
};
use async_trait::async_trait;
use data_types::write_buffer::WriteBufferCreationConfig;
use data_types2::Sequence;
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
use futures::{stream::BoxStream, StreamExt};
use iox_time::TimeProvider;
use parking_lot::Mutex;
use std::{
collections::{BTreeMap, BTreeSet},
num::NonZeroU32,
@ -5,19 +15,6 @@ use std::{
task::{Poll, Waker},
};
use async_trait::async_trait;
use futures::{stream::BoxStream, StreamExt};
use parking_lot::Mutex;
use data_types::sequence::Sequence;
use data_types::write_buffer::WriteBufferCreationConfig;
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
use iox_time::TimeProvider;
use crate::core::{
WriteBufferError, WriteBufferReading, WriteBufferStreamHandler, WriteBufferWriting,
};
#[derive(Debug, Default)]
struct WriteResVec {
/// The maximum sequence number in the entries

View File

@ -1,14 +1,11 @@
use std::collections::BTreeMap;
use data_types2::{KafkaPartition, SequenceNumber};
use observability_deps::tracing::debug;
use dml::DmlMeta;
/// Protobuf to/from conversion
use generated_types::influxdata::iox::write_summary::v1 as proto;
use dml::DmlMeta;
use observability_deps::tracing::debug;
use snafu::{OptionExt, Snafu};
use std::collections::BTreeMap;
mod progress;
pub use progress::SequencerProgress;
@ -203,7 +200,7 @@ impl TryFrom<proto::WriteSummary> for WriteSummary {
#[cfg(test)]
mod tests {
use super::*;
use data_types::sequence::Sequence;
use data_types2::Sequence;
#[test]
fn empty() {