From b870242ec7f51360d22be4336eb061689a853c7c Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 21 Apr 2023 16:24:46 -0700 Subject: [PATCH 01/25] chore(idpe-17434): remove utf8-percent encoding on v2 write path, such that it matches v1 writes and onCreate --- data_types/src/namespace_name.rs | 23 ++++++++------------ router/src/server/http/write/multi_tenant.rs | 6 ++--- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/data_types/src/namespace_name.rs b/data_types/src/namespace_name.rs index a89230e9b5..e86dcfb216 100644 --- a/data_types/src/namespace_name.rs +++ b/data_types/src/namespace_name.rs @@ -1,6 +1,5 @@ use std::{borrow::Cow, ops::RangeInclusive}; -use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; use thiserror::Error; /// Length constraints for a [`NamespaceName`] name. @@ -123,11 +122,7 @@ impl<'a> NamespaceName<'a> { return Err(OrgBucketMappingError::NoOrgBucketSpecified); } - let prefix: Cow<'_, str> = utf8_percent_encode(org, NON_ALPHANUMERIC).into(); - let suffix: Cow<'_, str> = utf8_percent_encode(bucket, NON_ALPHANUMERIC).into(); - - let db_name = format!("{}_{}", prefix, suffix); - Ok(Self::new(db_name)?) + Ok(Self::new(format!("{}_{}", org, bucket))?) } } @@ -188,34 +183,34 @@ mod tests { #[test] fn test_org_bucket_map_db_contains_underscore() { let got = NamespaceName::from_org_and_bucket("my_org", "bucket").unwrap(); - assert_eq!(got.as_str(), "my%5Forg_bucket"); + assert_eq!(got.as_str(), "my_org_bucket"); let got = NamespaceName::from_org_and_bucket("org", "my_bucket").unwrap(); - assert_eq!(got.as_str(), "org_my%5Fbucket"); + assert_eq!(got.as_str(), "org_my_bucket"); let got = NamespaceName::from_org_and_bucket("org", "my__bucket").unwrap(); - assert_eq!(got.as_str(), "org_my%5F%5Fbucket"); + assert_eq!(got.as_str(), "org_my__bucket"); let got = NamespaceName::from_org_and_bucket("my_org", "my_bucket").unwrap(); - assert_eq!(got.as_str(), "my%5Forg_my%5Fbucket"); + assert_eq!(got.as_str(), "my_org_my_bucket"); } #[test] fn test_org_bucket_map_db_contains_underscore_and_percent() { let got = NamespaceName::from_org_and_bucket("my%5Forg", "bucket").unwrap(); - assert_eq!(got.as_str(), "my%255Forg_bucket"); + assert_eq!(got.as_str(), "my%5Forg_bucket"); let got = NamespaceName::from_org_and_bucket("my%5Forg_", "bucket").unwrap(); - assert_eq!(got.as_str(), "my%255Forg%5F_bucket"); + assert_eq!(got.as_str(), "my%5Forg__bucket"); } #[test] fn test_bad_namespace_name_is_encoded() { let got = NamespaceName::from_org_and_bucket("org", "bucket?").unwrap(); - assert_eq!(got.as_str(), "org_bucket%3F"); + assert_eq!(got.as_str(), "org_bucket?"); let got = NamespaceName::from_org_and_bucket("org!", "bucket").unwrap(); - assert_eq!(got.as_str(), "org%21_bucket"); + assert_eq!(got.as_str(), "org!_bucket"); } #[test] diff --git a/router/src/server/http/write/multi_tenant.rs b/router/src/server/http/write/multi_tenant.rs index f89cc869f1..4cc218ec97 100644 --- a/router/src/server/http/write/multi_tenant.rs +++ b/router/src/server/http/write/multi_tenant.rs @@ -171,7 +171,7 @@ mod tests { namespace, .. }) => { - assert_eq!(namespace.as_str(), "cool%5Fconfusing_bucket"); + assert_eq!(namespace.as_str(), "cool_confusing_bucket"); } ); @@ -193,7 +193,7 @@ mod tests { namespace, .. }) => { - assert_eq!(namespace.as_str(), "cool%27confusing_bucket"); + assert_eq!(namespace.as_str(), "cool'confusing_bucket"); } ); @@ -204,7 +204,7 @@ mod tests { namespace, .. }) => { - assert_eq!(namespace.as_str(), "%5Fcoolconfusing_bucket"); + assert_eq!(namespace.as_str(), "_coolconfusing_bucket"); } ); From daabe9663cb197ea5e6a788536b6492f949ee6c7 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 21 Apr 2023 16:36:00 -0700 Subject: [PATCH 02/25] chore(idpe-17434): make restrictive whitelist of chars accepted, for any NamespaceName --- data_types/src/namespace_name.rs | 73 ++++++++++++++----- router/src/server/http/write/multi_tenant.rs | 12 +-- .../server/http/write/single_tenant/mod.rs | 32 ++++---- service_grpc_namespace/src/lib.rs | 11 ++- 4 files changed, 87 insertions(+), 41 deletions(-) diff --git a/data_types/src/namespace_name.rs b/data_types/src/namespace_name.rs index e86dcfb216..040e8ee9d4 100644 --- a/data_types/src/namespace_name.rs +++ b/data_types/src/namespace_name.rs @@ -7,6 +7,13 @@ use thiserror::Error; /// A `RangeInclusive` is a closed interval, covering [1, 64] const LENGTH_CONSTRAINT: RangeInclusive = 1..=64; +/// Whitelisted chars for a [`NamespaceName`] name. +/// +/// '/' | '_' | '-' are utilized by the platforms. +fn is_whitelisted(c: char) -> bool { + c.is_alphanumeric() || matches!(c, '/' | '_' | '-') +} + /// Errors returned when attempting to construct a [`NamespaceName`] from an org /// & bucket string pair. #[derive(Debug, Error)] @@ -40,7 +47,7 @@ pub enum NamespaceNameError { /// The provided namespace name contains an unacceptable character. #[error( "namespace name '{}' contains invalid character, character number {} \ - is a control which is not allowed", + is not whitelisted", name, bad_char_offset )] @@ -91,7 +98,7 @@ impl<'a> NamespaceName<'a> { // // NOTE: If changing these characters, please update the error message // above. - if let Some(bad_char_offset) = name.chars().position(|c| c.is_control()) { + if let Some(bad_char_offset) = name.chars().position(|c| !is_whitelisted(c)) { return Err(NamespaceNameError::BadChars { bad_char_offset, name: name.to_string(), @@ -197,20 +204,32 @@ mod tests { #[test] fn test_org_bucket_map_db_contains_underscore_and_percent() { - let got = NamespaceName::from_org_and_bucket("my%5Forg", "bucket").unwrap(); - assert_eq!(got.as_str(), "my%5Forg_bucket"); + let err = NamespaceName::from_org_and_bucket("my%5Forg", "bucket"); + assert!(matches!( + err, + Err(OrgBucketMappingError::InvalidNamespaceName { .. }) + )); - let got = NamespaceName::from_org_and_bucket("my%5Forg_", "bucket").unwrap(); - assert_eq!(got.as_str(), "my%5Forg__bucket"); + let err = NamespaceName::from_org_and_bucket("my%5Forg_", "bucket"); + assert!(matches!( + err, + Err(OrgBucketMappingError::InvalidNamespaceName { .. }) + )); } #[test] - fn test_bad_namespace_name_is_encoded() { - let got = NamespaceName::from_org_and_bucket("org", "bucket?").unwrap(); - assert_eq!(got.as_str(), "org_bucket?"); + fn test_bad_namespace_name_fails_validation() { + let err = NamespaceName::from_org_and_bucket("org", "bucket?"); + assert!(matches!( + err, + Err(OrgBucketMappingError::InvalidNamespaceName { .. }) + )); - let got = NamespaceName::from_org_and_bucket("org!", "bucket").unwrap(); - assert_eq!(got.as_str(), "org!_bucket"); + let err = NamespaceName::from_org_and_bucket("org!", "bucket"); + assert!(matches!( + err, + Err(OrgBucketMappingError::InvalidNamespaceName { .. }) + )); } #[test] @@ -251,30 +270,50 @@ mod tests { #[test] fn test_bad_chars_null() { let got = NamespaceName::new("example\x00").unwrap_err(); - assert_eq!(got.to_string() , "namespace name 'example\x00' contains invalid character, character number 7 is a control which is not allowed"); + assert_eq!(got.to_string() , "namespace name 'example\x00' contains invalid character, character number 7 is not whitelisted"); } #[test] fn test_bad_chars_high_control() { let got = NamespaceName::new("\u{007f}example").unwrap_err(); - assert_eq!(got.to_string() , "namespace name '\u{007f}example' contains invalid character, character number 0 is a control which is not allowed"); + assert_eq!(got.to_string() , "namespace name '\u{007f}example' contains invalid character, character number 0 is not whitelisted"); } #[test] fn test_bad_chars_tab() { let got = NamespaceName::new("example\tdb").unwrap_err(); - assert_eq!(got.to_string() , "namespace name 'example\tdb' contains invalid character, character number 7 is a control which is not allowed"); + assert_eq!(got.to_string() , "namespace name 'example\tdb' contains invalid character, character number 7 is not whitelisted"); } #[test] fn test_bad_chars_newline() { let got = NamespaceName::new("my_example\ndb").unwrap_err(); - assert_eq!(got.to_string() , "namespace name 'my_example\ndb' contains invalid character, character number 10 is a control which is not allowed"); + assert_eq!(got.to_string() , "namespace name 'my_example\ndb' contains invalid character, character number 10 is not whitelisted"); + } + + #[test] + fn test_bad_chars_whitespace() { + let got = NamespaceName::new("my_example db").unwrap_err(); + assert_eq!(got.to_string() , "namespace name 'my_example db' contains invalid character, character number 10 is not whitelisted"); + } + + #[test] + fn test_bad_chars_single_quote() { + let got = NamespaceName::new("my_example'db").unwrap_err(); + assert_eq!(got.to_string() , "namespace name 'my_example\'db' contains invalid character, character number 10 is not whitelisted"); } #[test] fn test_ok_chars() { - let db = NamespaceName::new("my-example-db_with_underscores and spaces").unwrap(); - assert_eq!(&*db, "my-example-db_with_underscores and spaces"); + let db = + NamespaceName::new("my-example-db_with_underscores/and/fwd/slash/AndCaseSensitive") + .unwrap(); + assert_eq!( + &*db, + "my-example-db_with_underscores/and/fwd/slash/AndCaseSensitive" + ); + + let db = NamespaceName::new("a_ã_京").unwrap(); + assert_eq!(&*db, "a_ã_京"); } } diff --git a/router/src/server/http/write/multi_tenant.rs b/router/src/server/http/write/multi_tenant.rs index 4cc218ec97..734777adb9 100644 --- a/router/src/server/http/write/multi_tenant.rs +++ b/router/src/server/http/write/multi_tenant.rs @@ -78,6 +78,7 @@ fn parse_v2(req: &Request) -> Result #[cfg(test)] mod tests { use assert_matches::assert_matches; + use data_types::NamespaceNameError; use super::*; use crate::server::http::write::Precision; @@ -189,12 +190,11 @@ mod tests { test_parse_v2!( encoded_quotation, query_string = "?org=cool'confusing&bucket=bucket", - want = Ok(WriteParams { - namespace, - .. - }) => { - assert_eq!(namespace.as_str(), "cool'confusing_bucket"); - } + want = Err(Error::MultiTenantError( + MultiTenantExtractError::InvalidOrgAndBucket( + OrgBucketMappingError::InvalidNamespaceName(NamespaceNameError::BadChars { .. }) + ) + )) ); test_parse_v2!( diff --git a/router/src/server/http/write/single_tenant/mod.rs b/router/src/server/http/write/single_tenant/mod.rs index 17853ea93a..b9adfe576b 100644 --- a/router/src/server/http/write/single_tenant/mod.rs +++ b/router/src/server/http/write/single_tenant/mod.rs @@ -381,9 +381,9 @@ mod tests { test_parse_v1!( encoded_quotation, query_string = "?db=ban'anas", - want = Ok(WriteParams{ namespace, precision: _ }) => { - assert_eq!(namespace.as_str(), "ban'anas"); - } + want = Err(Error::SingleTenantError( + SingleTenantExtractError::InvalidNamespace(NamespaceNameError::BadChars { .. }) + )) ); test_parse_v1!( @@ -483,17 +483,24 @@ mod tests { )) ); - // Do not encode potentially problematic input. test_parse_v2!( - no_encoding, + url_encoding, // URL-encoded input that is decoded in the HTTP layer - query_string = "?bucket=cool%2Fconfusing%F0%9F%8D%8C&prg=org", + query_string = "?bucket=cool%2Fconfusing&prg=org", want = Ok(WriteParams {namespace, ..}) => { // Yielding a not-encoded string as the namespace. - assert_eq!(namespace.as_str(), "cool/confusing🍌"); + assert_eq!(namespace.as_str(), "cool/confusing"); } ); + test_parse_v2!( + encoded_emoji, + query_string = "?bucket=confusing%F0%9F%8D%8C&prg=org", + want = Err(Error::SingleTenantError( + SingleTenantExtractError::InvalidNamespace(NamespaceNameError::BadChars { .. }) + )) + ); + test_parse_v2!( org_ignored, query_string = "?org=wat&bucket=bananas", @@ -518,14 +525,11 @@ mod tests { ); test_parse_v2!( - encoded_quotation, + single_quotation, query_string = "?bucket=buc'ket", - want = Ok(WriteParams { - namespace, - .. - }) => { - assert_eq!(namespace.as_str(), "buc'ket"); - } + want = Err(Error::SingleTenantError( + SingleTenantExtractError::InvalidNamespace(NamespaceNameError::BadChars { .. }) + )) ); test_parse_v2!( diff --git a/service_grpc_namespace/src/lib.rs b/service_grpc_namespace/src/lib.rs index f624838367..3fd96297bb 100644 --- a/service_grpc_namespace/src/lib.rs +++ b/service_grpc_namespace/src/lib.rs @@ -633,23 +633,26 @@ mod tests { test_create_namespace_name!(ok, name = "bananas", want = Ok("bananas")); - test_create_namespace_name!(multi_byte, name = "🍌", want = Ok("🍌")); + test_create_namespace_name!(multi_byte, name = "🍌", want = Err(e) => { + assert_eq!(e.code(), Code::InvalidArgument); + assert_eq!(e.message(), "namespace name '🍌' contains invalid character, character number 0 is not whitelisted"); + }); test_create_namespace_name!( tab, name = "it\tis\ttabtasitc", want = Err(e) => { assert_eq!(e.code(), Code::InvalidArgument); - assert_eq!(e.message(), "namespace name 'it\tis\ttabtasitc' contains invalid character, character number 2 is a control which is not allowed"); + assert_eq!(e.message(), "namespace name 'it\tis\ttabtasitc' contains invalid character, character number 2 is not whitelisted"); } ); test_create_namespace_name!( null, - name = "bad \0 bananas", + name = "bad\0bananas", want = Err(e) => { assert_eq!(e.code(), Code::InvalidArgument); - assert_eq!(e.message(), "namespace name 'bad \0 bananas' contains invalid character, character number 4 is a control which is not allowed"); + assert_eq!(e.message(), "namespace name 'bad\0bananas' contains invalid character, character number 3 is not whitelisted"); } ); From 2769b523214185beec5ee2f6472a941fc35a9e65 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 24 Apr 2023 13:43:39 +0200 Subject: [PATCH 03/25] feat(ingester): blocked write load shedding This commit bounds the duration of time a write may take to be processed after being added to the WAL for increased reliability. In ideal cases, anything that is added to the WAL is buffered / completed to prevent WAL replay materialising writes that never completed in the first place (#7111), but in some cases (i.e. catalog outage, etc) the write becomes blocked, stuck in a retry loop or otherwise never making progress. When a write is blocked, the data within it remains in RAM, and the overhead of the spawned task retrying also consumes CPU resources. If all writes are blocked for sufficient time, these limitless writes keep building up until a resource is exhausted (i.e. RAM -> OOM) causing an outage of the ingester. Instead, this change allocates a write at most 15 seconds to complete, before it is cancelled and an error returned to the user (if they're still connected) to shed load. The error message "buffer apply request timeout" was chosen to be uniquely identifying - systems have lots of timeouts! Normally the inner.apply() call at this point completes in microseconds, so 15 seconds is more than long enough to avoid shedding legitimate load. --- ingester2/src/dml_sink/trait.rs | 5 ++ ingester2/src/server/grpc/rpc_write.rs | 1 + ingester2/src/wal/wal_sink.rs | 95 ++++++++++++++++++++++++-- 3 files changed, 95 insertions(+), 6 deletions(-) diff --git a/ingester2/src/dml_sink/trait.rs b/ingester2/src/dml_sink/trait.rs index 9b635de0b3..b46338e766 100644 --- a/ingester2/src/dml_sink/trait.rs +++ b/ingester2/src/dml_sink/trait.rs @@ -16,6 +16,11 @@ pub enum DmlError { /// An error appending the [`DmlOperation`] to the write-ahead log. #[error("wal commit failure: {0}")] Wal(String), + + /// The write has hit an internal timeout designed to prevent writes from + /// retrying indefinitely. + #[error("buffer apply request timeout")] + ApplyTimeout, } /// A [`DmlSink`] handles [`DmlOperation`] instances in some abstract way. diff --git a/ingester2/src/server/grpc/rpc_write.rs b/ingester2/src/server/grpc/rpc_write.rs index 8ced0359ea..00a1b87d63 100644 --- a/ingester2/src/server/grpc/rpc_write.rs +++ b/ingester2/src/server/grpc/rpc_write.rs @@ -63,6 +63,7 @@ impl From for tonic::Status { match e { DmlError::Buffer(e) => map_write_error(e), DmlError::Wal(_) => Self::internal(e.to_string()), + DmlError::ApplyTimeout => Self::internal(e.to_string()), } } } diff --git a/ingester2/src/wal/wal_sink.rs b/ingester2/src/wal/wal_sink.rs index 8fe67fd1ea..f09dd271f9 100644 --- a/ingester2/src/wal/wal_sink.rs +++ b/ingester2/src/wal/wal_sink.rs @@ -2,7 +2,7 @@ use async_trait::async_trait; use dml::DmlOperation; use generated_types::influxdata::iox::wal::v1::sequenced_wal_op::Op; use mutable_batch_pb::encode::encode_write; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use tokio::sync::watch::Receiver; use wal::{SequencedWalOp, WriteResult}; @@ -13,6 +13,17 @@ use crate::{ use super::traits::WalAppender; +/// [`DELEGATE_APPLY_TIMEOUT`] defines how long the inner [`DmlSink`] is given +/// to complete the write [`DmlSink::apply()`] call. +/// +/// If this limit weren't enforced, a write that does not make progress would +/// consume resources forever. Instead, a reasonable duration of time is given +/// to attempt the write before an error is returned to the caller. +/// +/// In practice, this limit SHOULD only ever be reached as a symptom of a larger +/// problem (catalog unavailable, etc) preventing a write from making progress. +const DELEGATE_APPLY_TIMEOUT: Duration = Duration::from_secs(15); + /// A [`DmlSink`] decorator that ensures any [`DmlOperation`] is committed to /// the write-ahead log before passing the operation to the inner [`DmlSink`]. #[derive(Debug)] @@ -49,7 +60,11 @@ where // durable. // // Ensure that this future is always driven to completion now that the - // WAL entry is being committed, otherwise they'll diverge. + // WAL entry is being committed, otherwise they'll diverge. At the same + // time, do not allow the spawned task to live forever, consuming + // resources without making progress - instead shed load after a + // reasonable duration of time (DELEGATE_APPLY_TIMEOUT) has passed, + // before returning a write error (if the caller is still listening). // // If this buffer apply fails, the entry remains in the WAL and will be // attempted again during WAL replay after a crash. If this can never @@ -58,9 +73,14 @@ where // https://github.com/influxdata/influxdb_iox/issues/7111 // let inner = self.inner.clone(); - CancellationSafe::new(async move { inner.apply(op).await }) - .await - .map_err(Into::into)?; + CancellationSafe::new(async move { + let res = tokio::time::timeout(DELEGATE_APPLY_TIMEOUT, inner.apply(op)) + .await + .map_err(|_| DmlError::ApplyTimeout)?; + + res.map_err(Into::into) + }) + .await?; // Wait for the write to be durable before returning to the user write_result @@ -101,7 +121,8 @@ impl WalAppender for Arc { #[cfg(test)] mod tests { - use std::sync::Arc; + use core::{future::Future, marker::Send, pin::Pin}; + use std::{future, sync::Arc}; use assert_matches::assert_matches; use data_types::{NamespaceId, PartitionKey, TableId}; @@ -181,4 +202,66 @@ mod tests { assert_eq!(want, *payload); } + + /// A [`DmlSink`] implementation that hangs forever and never completes. + #[derive(Debug, Default, Clone)] + struct BlockingDmlSink; + + impl DmlSink for BlockingDmlSink { + type Error = DmlError; + + fn apply<'life0, 'async_trait>( + &'life0 self, + _op: DmlOperation, + ) -> Pin> + Send + 'async_trait>> + where + 'life0: 'async_trait, + Self: 'async_trait, + { + Box::pin(future::pending()) + } + } + + #[tokio::test] + async fn test_timeout() { + let dir = tempfile::tempdir().unwrap(); + + // Generate the test op + let op = make_write_op( + &PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_NAME, + TABLE_ID, + 42, + r#"bananas,region=Madrid temp=35 4242424242"#, + ); + + let wal = Wal::new(dir.path()) + .await + .expect("failed to initialise WAL"); + + let wal_sink = WalSink::new(BlockingDmlSink::default(), wal); + + // Allow tokio to automatically advance time past the timeout duration, + // when all threads are blocked on await points. + // + // This allows the test to drive the timeout logic without actually + // waiting for the timeout duration in the test. + tokio::time::pause(); + + let start = tokio::time::Instant::now(); + + // Apply the op through the decorator, which should time out + let err = wal_sink + .apply(DmlOperation::Write(op.clone())) + .await + .expect_err("write should time out"); + + assert_matches!(err, DmlError::ApplyTimeout); + + // Ensure that "time" advanced at least the timeout amount of time + // before erroring. + let duration = tokio::time::Instant::now().duration_since(start); + assert!(duration > DELEGATE_APPLY_TIMEOUT); + } } From 7b7101b53faa5c92360d2495641bb6990cd0b987 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 24 Feb 2023 13:54:20 -0500 Subject: [PATCH 04/25] fix: In DmlMeta, only use SequenceNumber rather than Sequence (shard index plus seq num) --- data_types/src/lib.rs | 19 ------------------- dml/src/lib.rs | 18 +++++++++--------- ingester2/benches/write.rs | 4 ++-- ingester2/src/buffer_tree/namespace.rs | 3 +-- ingester2/src/dml_sink/tracing.rs | 4 ++-- ingester2/src/init/wal_replay.rs | 8 ++------ ingester2/src/server/grpc/rpc_write.rs | 14 +++++--------- ingester2/src/test_util.rs | 13 ++++--------- ingester2/src/wal/wal_sink.rs | 1 - ingester2_test_ctx/src/lib.rs | 8 +++----- 10 files changed, 28 insertions(+), 64 deletions(-) diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 82d3198486..d6006761c3 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -2039,25 +2039,6 @@ impl TableSummary { } } -/// Shard index plus offset -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub struct Sequence { - /// The shard index - pub shard_index: ShardIndex, - /// The sequence number - pub sequence_number: SequenceNumber, -} - -impl Sequence { - /// Create a new Sequence - pub fn new(shard_index: ShardIndex, sequence_number: SequenceNumber) -> Self { - Self { - shard_index, - sequence_number, - } - } -} - /// minimum time that can be represented. /// /// 1677-09-21 00:12:43.145224194 +0000 UTC diff --git a/dml/src/lib.rs b/dml/src/lib.rs index 9cc8a541f3..02849b3254 100644 --- a/dml/src/lib.rs +++ b/dml/src/lib.rs @@ -16,8 +16,8 @@ use std::time::Duration; use data_types::{ - DeletePredicate, NamespaceId, NonEmptyString, PartitionKey, Sequence, StatValues, Statistics, - TableId, + DeletePredicate, NamespaceId, NonEmptyString, PartitionKey, SequenceNumber, StatValues, + Statistics, TableId, }; use hashbrown::HashMap; use iox_time::{Time, TimeProvider}; @@ -28,7 +28,7 @@ use trace::ctx::SpanContext; #[derive(Debug, Default, Clone, PartialEq)] pub struct DmlMeta { /// The sequence number associated with this write - sequence: Option, + sequence_number: Option, /// When this write was ingested into the write buffer producer_ts: Option