From 6387a9576a81016dfbb97bf83c18ec61f5aaa9f7 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 22 Feb 2023 15:01:30 -0500 Subject: [PATCH] fix: Remove the write_summary crate and write info service --- Cargo.lock | 17 - Cargo.toml | 1 - data_types/src/lib.rs | 14 - generated_types/build.rs | 3 - .../iox/ingester/v1/write_info.proto | 57 --- .../iox/write_summary/v1/write_summary.proto | 24 - generated_types/src/lib.rs | 15 - generated_types/src/write_info.rs | 155 ------ influxdb_iox_client/src/client.rs | 3 - influxdb_iox_client/src/client/write_info.rs | 52 -- ioxd_querier/src/lib.rs | 4 - ioxd_querier/src/rpc/mod.rs | 1 - ioxd_querier/src/rpc/write_info.rs | 48 -- ioxd_router/Cargo.toml | 1 - ioxd_router/src/lib.rs | 6 +- querier/src/ingester/mod.rs | 40 +- querier/src/ingester/test_util.rs | 5 - router/Cargo.toml | 1 - router/src/dml_handlers/fan_out.rs | 6 +- router/src/dml_handlers/instrumentation.rs | 7 +- router/src/dml_handlers/mock.rs | 10 +- router/src/dml_handlers/mod.rs | 3 - router/src/dml_handlers/write_summary.rs | 67 --- router/src/server/http.rs | 65 ++- router/tests/common/mod.rs | 12 +- service_grpc_influxrpc/src/id.rs | 20 - write_summary/Cargo.toml | 19 - write_summary/src/lib.rs | 445 ------------------ write_summary/src/progress.rs | 399 ---------------- 29 files changed, 42 insertions(+), 1458 deletions(-) delete mode 100644 generated_types/protos/influxdata/iox/ingester/v1/write_info.proto delete mode 100644 generated_types/protos/influxdata/iox/write_summary/v1/write_summary.proto delete mode 100644 generated_types/src/write_info.rs delete mode 100644 influxdb_iox_client/src/client/write_info.rs delete mode 100644 ioxd_querier/src/rpc/write_info.rs delete mode 100644 router/src/dml_handlers/write_summary.rs delete mode 100644 write_summary/Cargo.toml delete mode 100644 write_summary/src/lib.rs delete mode 100644 write_summary/src/progress.rs diff --git a/Cargo.lock b/Cargo.lock index 490f1d541c..e30bd2764c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3233,7 +3233,6 @@ dependencies = [ "tokio-util", "trace", "workspace-hack", - "write_summary", ] [[package]] @@ -4839,7 +4838,6 @@ dependencies = [ "tonic", "trace", "workspace-hack", - "write_summary", ] [[package]] @@ -6866,21 +6864,6 @@ dependencies = [ "zstd-sys", ] -[[package]] -name = "write_summary" -version = "0.1.0" -dependencies = [ - "base64 0.21.0", - "data_types", - "dml", - "generated_types", - "iox_time", - "observability_deps", - "serde_json", - "snafu", - "workspace-hack", -] - [[package]] name = "xz2" version = "0.1.7" diff --git a/Cargo.toml b/Cargo.toml index 4cbd657c4a..779eef6713 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,6 @@ members = [ "trogging", "wal", "workspace-hack", - "write_summary", ] default-members = ["influxdb_iox"] diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index e5ced48a78..4c47d51d90 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -2300,20 +2300,6 @@ impl TimestampMinMax { } } -/// Specifies the status of data in the ingestion process. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum ShardWriteStatus { - /// Nothing is known about this write (e.g. it refers to a shard for which we have no - /// information) - ShardUnknown, - /// The data has not yet been processed by the ingester, and thus is unreadable - Durable, - /// The data is readable, but not yet persisted - Readable, - /// The data is both readable and persisted to parquet - Persisted, -} - #[cfg(test)] mod tests { use std::borrow::Cow; diff --git a/generated_types/build.rs b/generated_types/build.rs index be89851de8..93628288b7 100644 --- a/generated_types/build.rs +++ b/generated_types/build.rs @@ -47,7 +47,6 @@ fn generate_grpc_types(root: &Path) -> Result<()> { let sharder_path = root.join("influxdata/iox/sharder/v1"); let wal_path = root.join("influxdata/iox/wal/v1"); let write_buffer_path = root.join("influxdata/iox/write_buffer/v1"); - let write_summary_path = root.join("influxdata/iox/write_summary/v1"); let storage_path = root.join("influxdata/platform/storage"); let storage_errors_path = root.join("influxdata/platform/errors"); @@ -59,7 +58,6 @@ fn generate_grpc_types(root: &Path) -> Result<()> { delete_path.join("service.proto"), ingester_path.join("parquet_metadata.proto"), ingester_path.join("query.proto"), - ingester_path.join("write_info.proto"), ingester_path.join("write.proto"), ingester_path.join("replication.proto"), ingester_path.join("persist.proto"), @@ -76,7 +74,6 @@ fn generate_grpc_types(root: &Path) -> Result<()> { sharder_path.join("sharder.proto"), wal_path.join("wal.proto"), write_buffer_path.join("write_buffer.proto"), - write_summary_path.join("write_summary.proto"), storage_path.join("predicate.proto"), storage_path.join("service.proto"), storage_path.join("source.proto"), diff --git a/generated_types/protos/influxdata/iox/ingester/v1/write_info.proto b/generated_types/protos/influxdata/iox/ingester/v1/write_info.proto deleted file mode 100644 index e50eb73307..0000000000 --- a/generated_types/protos/influxdata/iox/ingester/v1/write_info.proto +++ /dev/null @@ -1,57 +0,0 @@ -syntax = "proto3"; -package influxdata.iox.ingester.v1; -option go_package = "github.com/influxdata/iox/ingester/v1"; - -// NOTE: This is an ALPHA / Internal API that is used as part of the -// end to end tests. -// -// A public API is tracked here: -// -service WriteInfoService { - // Get information about a particular write - rpc GetWriteInfo(GetWriteInfoRequest) returns (GetWriteInfoResponse); -} - -message GetWriteInfoRequest { - // The write token returned from a write that was written to one or - // more shards - string write_token = 1; -} - -message GetWriteInfoResponse { - // Renamed from kafka_partition_infos to shard_infos - reserved 3; - reserved "kafka_partition_infos"; - - // Information for all shards in this write - repeated ShardInfo shard_infos = 4; -} - -// Status of a part of a write in a particular shard -message ShardInfo { - // Unique shard index - int32 shard_index = 1; - - // the status of the data for this shard - ShardStatus status = 2; -} - -// the state -enum ShardStatus { - // Unspecified status, will result in an error. - SHARD_STATUS_UNSPECIFIED = 0; - - // The ingester has not yet processed data in this write - SHARD_STATUS_DURABLE = 1; - - // The ingester has processed the data in this write and it is - // readable (will be included in a query response)? - SHARD_STATUS_READABLE = 2; - - // The ingester has processed the data in this write and it is both - // readable and completely persisted to parquet files. - SHARD_STATUS_PERSISTED = 3; - - // The ingester does not have information about this shard - SHARD_STATUS_UNKNOWN = 4; -} diff --git a/generated_types/protos/influxdata/iox/write_summary/v1/write_summary.proto b/generated_types/protos/influxdata/iox/write_summary/v1/write_summary.proto deleted file mode 100644 index 6c79019835..0000000000 --- a/generated_types/protos/influxdata/iox/write_summary/v1/write_summary.proto +++ /dev/null @@ -1,24 +0,0 @@ -syntax = "proto3"; -package influxdata.iox.write_summary.v1; -option go_package = "github.com/influxdata/iox/write_summary/v1"; - -// Represents a single logical write that was partitioned and sharded -// into multiple pieces in multiple shards (kafka partitions) -message WriteSummary { - // Renamed from sequencers to shards - reserved 1; - reserved "sequencers"; - - // per shard index (kafka partition) information - repeated ShardWrite shards = 2; -} - -// Per shard (kafka partition) information about what sequence -// numbers contain part of a write -message ShardWrite { - // Unique shard index (kafka partition). - int32 shard_index = 1; - - // Which sequence numbers for this shard had data - repeated int64 sequence_numbers = 2; -} diff --git a/generated_types/src/lib.rs b/generated_types/src/lib.rs index 5716d7c0cd..5f179a595d 100644 --- a/generated_types/src/lib.rs +++ b/generated_types/src/lib.rs @@ -196,19 +196,6 @@ pub mod influxdata { )); } } - - pub mod write_summary { - pub mod v1 { - include!(concat!( - env!("OUT_DIR"), - "/influxdata.iox.write_summary.v1.rs" - )); - include!(concat!( - env!("OUT_DIR"), - "/influxdata.iox.write_summary.v1.serde.rs" - )); - } - } } pub mod pbdata { @@ -281,8 +268,6 @@ pub mod compactor; pub mod delete_predicate; #[cfg(any(feature = "data_types_conversions", test))] pub mod ingester; -#[cfg(any(feature = "data_types_conversions", test))] -pub mod write_info; pub use prost::{DecodeError, EncodeError}; diff --git a/generated_types/src/write_info.rs b/generated_types/src/write_info.rs deleted file mode 100644 index d8a4f89dfa..0000000000 --- a/generated_types/src/write_info.rs +++ /dev/null @@ -1,155 +0,0 @@ -use crate::influxdata::iox::ingester::v1 as proto; -use data_types::ShardWriteStatus; -use std::collections::HashMap; - -impl From for proto::ShardStatus { - fn from(status: ShardWriteStatus) -> Self { - match status { - ShardWriteStatus::ShardUnknown => Self::Unknown, - ShardWriteStatus::Durable => Self::Durable, - ShardWriteStatus::Readable => Self::Readable, - ShardWriteStatus::Persisted => Self::Persisted, - } - } -} - -impl proto::ShardStatus { - /// Convert the status to a number such that higher numbers are later in the data lifecycle. - /// For use in merging multiple write status gRPC responses into one response. - fn status_order(&self) -> u8 { - match self { - Self::Unspecified => panic!("Unspecified status"), - Self::Unknown => 0, - Self::Durable => 1, - Self::Readable => 2, - Self::Persisted => 3, - } - } -} - -impl proto::ShardInfo { - fn merge(&mut self, other: &Self) { - let self_status = self.status(); - let other_status = other.status(); - - let new_status = match self_status.status_order().cmp(&other_status.status_order()) { - std::cmp::Ordering::Less => other_status, - std::cmp::Ordering::Equal => self_status, - std::cmp::Ordering::Greater => self_status, - }; - - self.set_status(new_status); - } -} - -/// "Merges" the partition information for write info responses so that the "most recent" -/// information is returned. -pub fn merge_responses( - responses: impl IntoIterator, -) -> proto::GetWriteInfoResponse { - // Map shard index to status - let mut shard_infos: HashMap<_, proto::ShardInfo> = HashMap::new(); - - responses - .into_iter() - .flat_map(|res| res.shard_infos.into_iter()) - .for_each(|info| { - shard_infos - .entry(info.shard_index) - .and_modify(|existing_info| existing_info.merge(&info)) - .or_insert(info); - }); - - let shard_infos = shard_infos.into_values().collect(); - - proto::GetWriteInfoResponse { shard_infos } -} - -#[cfg(test)] -mod tests { - use super::*; - use proto::{ShardInfo, ShardStatus}; - - #[test] - fn test_merge() { - #[derive(Debug)] - struct Test<'a> { - left: &'a ShardInfo, - right: &'a ShardInfo, - expected: &'a ShardInfo, - } - - let durable = ShardInfo { - shard_index: 1, - status: ShardStatus::Durable.into(), - }; - - let readable = ShardInfo { - shard_index: 1, - status: ShardStatus::Readable.into(), - }; - - let persisted = ShardInfo { - shard_index: 1, - status: ShardStatus::Persisted.into(), - }; - - let unknown = ShardInfo { - shard_index: 1, - status: ShardStatus::Unknown.into(), - }; - - let tests = vec![ - Test { - left: &unknown, - right: &unknown, - expected: &unknown, - }, - Test { - left: &unknown, - right: &durable, - expected: &durable, - }, - Test { - left: &unknown, - right: &readable, - expected: &readable, - }, - Test { - left: &durable, - right: &unknown, - expected: &durable, - }, - Test { - left: &readable, - right: &readable, - expected: &readable, - }, - Test { - left: &durable, - right: &durable, - expected: &durable, - }, - Test { - left: &readable, - right: &durable, - expected: &readable, - }, - Test { - left: &persisted, - right: &durable, - expected: &persisted, - }, - ]; - - for test in tests { - let mut output = test.left.clone(); - - output.merge(test.right); - assert_eq!( - &output, test.expected, - "Mismatch\n\nOutput:\n{output:#?}\n\nTest:\n{test:#?}" - ); - } - } -} diff --git a/influxdb_iox_client/src/client.rs b/influxdb_iox_client/src/client.rs index bf74be63c2..7d93924a5e 100644 --- a/influxdb_iox_client/src/client.rs +++ b/influxdb_iox_client/src/client.rs @@ -36,8 +36,5 @@ pub mod store; /// Client for testing purposes. pub mod test; -/// Client for fetching write info -pub mod write_info; - /// Client for write API pub mod write; diff --git a/influxdb_iox_client/src/client/write_info.rs b/influxdb_iox_client/src/client/write_info.rs deleted file mode 100644 index 1778d4be0c..0000000000 --- a/influxdb_iox_client/src/client/write_info.rs +++ /dev/null @@ -1,52 +0,0 @@ -use client_util::connection::GrpcConnection; - -use self::generated_types::{write_info_service_client::WriteInfoServiceClient, *}; - -use crate::connection::Connection; -use crate::error::Error; - -/// Re-export generated_types -pub mod generated_types { - pub use generated_types::influxdata::iox::ingester::v1::{ - write_info_service_client, write_info_service_server, GetWriteInfoRequest, - GetWriteInfoResponse, ShardInfo, ShardStatus, - }; - pub use generated_types::write_info::merge_responses; -} - -/// A basic client for fetching information about write tokens from a -/// single ingester. -/// -/// NOTE: This is an ALPHA / Internal API that is used as part of the -/// end to end tests. -/// -/// A public API is tracked here: -/// -#[derive(Debug, Clone)] -pub struct Client { - inner: WriteInfoServiceClient, -} - -impl Client { - /// Creates a new client with the provided connection - pub fn new(connection: Connection) -> Self { - Self { - inner: WriteInfoServiceClient::new(connection.into_grpc_connection()), - } - } - - /// Get the write information for a write token - pub async fn get_write_info( - &mut self, - write_token: &str, - ) -> Result { - let response = self - .inner - .get_write_info(GetWriteInfoRequest { - write_token: write_token.to_string(), - }) - .await?; - - Ok(response.into_inner()) - } -} diff --git a/ioxd_querier/src/lib.rs b/ioxd_querier/src/lib.rs index 1d27810743..bcaa8edcaf 100644 --- a/ioxd_querier/src/lib.rs +++ b/ioxd_querier/src/lib.rs @@ -98,10 +98,6 @@ impl ServerType for QuerierServer builder, rpc::namespace::namespace_service(Arc::clone(&self.database)) ); - add_service!( - builder, - rpc::write_info::write_info_service(Arc::clone(&self.database)) - ); add_service!(builder, self.server.handler().schema_service()); add_service!(builder, self.server.handler().catalog_service()); add_service!(builder, self.server.handler().object_store_service()); diff --git a/ioxd_querier/src/rpc/mod.rs b/ioxd_querier/src/rpc/mod.rs index f3e28368dc..1bb1c407f6 100644 --- a/ioxd_querier/src/rpc/mod.rs +++ b/ioxd_querier/src/rpc/mod.rs @@ -1,3 +1,2 @@ pub(crate) mod namespace; pub(crate) mod query; -pub(crate) mod write_info; diff --git a/ioxd_querier/src/rpc/write_info.rs b/ioxd_querier/src/rpc/write_info.rs deleted file mode 100644 index 74a804a3f9..0000000000 --- a/ioxd_querier/src/rpc/write_info.rs +++ /dev/null @@ -1,48 +0,0 @@ -//! WriteInfoService gRPC implementation - -use generated_types::influxdata::iox::ingester::v1::{ - self as proto, - write_info_service_server::{WriteInfoService, WriteInfoServiceServer}, -}; -use querier::QuerierDatabase; -use std::sync::Arc; - -/// Acquire a [`WriteInfoService`] gRPC service implementation. -pub fn write_info_service( - server: Arc, -) -> WriteInfoServiceServer { - WriteInfoServiceServer::new(QuerierWriteInfoServiceImpl::new(server)) -} - -#[derive(Debug)] -struct QuerierWriteInfoServiceImpl { - server: Arc, -} - -impl QuerierWriteInfoServiceImpl { - pub fn new(server: Arc) -> Self { - Self { server } - } -} - -#[tonic::async_trait] -impl WriteInfoService for QuerierWriteInfoServiceImpl { - async fn get_write_info( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - let proto::GetWriteInfoRequest { write_token } = request.into_inner(); - - let ingester_connection = self - .server - .ingester_connection() - .expect("Ingester connections must be configured to get write info"); - - let progresses = ingester_connection - .get_write_info(&write_token) - .await - .map_err(|e| tonic::Status::internal(e.to_string()))?; - - Ok(tonic::Response::new(progresses)) - } -} diff --git a/ioxd_router/Cargo.toml b/ioxd_router/Cargo.toml index 7bca172c83..60bd848a43 100644 --- a/ioxd_router/Cargo.toml +++ b/ioxd_router/Cargo.toml @@ -24,4 +24,3 @@ tokio = { version = "1.27", features = ["macros", "net", "parking_lot", "rt-mult tokio-util = { version = "0.7.7" } trace = { path = "../trace" } workspace-hack = { version = "0.1", path = "../workspace-hack" } -write_summary = { path = "../write_summary" } diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index 970f3ec012..2125486e10 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -27,7 +27,6 @@ use router::{ dml_handlers::{ lazy_connector::LazyConnector, DmlHandler, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, Partitioner, RetentionValidator, RpcWrite, SchemaValidator, - WriteSummaryAdapter, }, namespace_cache::{ metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache, ReadThroughCache, @@ -55,7 +54,6 @@ use std::{ use thiserror::Error; use tokio_util::sync::CancellationToken; use trace::TraceCollector; -use write_summary::WriteSummary; #[derive(Debug, Error)] pub enum Error { @@ -96,7 +94,7 @@ impl std::fmt::Debug for RpcWriteRouterServerType { #[async_trait] impl ServerType for RpcWriteRouterServerType where - D: DmlHandler, WriteOutput = WriteSummary> + 'static, + D: DmlHandler, WriteOutput = ()> + 'static, N: NamespaceResolver + 'static, { /// Return the [`metric::Registry`] used by the router. @@ -318,7 +316,7 @@ pub async fn create_router2_server_type( //////////////////////////////////////////////////////////////////////////// // # Parallel writer - let parallel_write = WriteSummaryAdapter::new(FanOutAdaptor::new(rpc_writer)); + let parallel_write = FanOutAdaptor::new(rpc_writer); // # Handler stack // diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 7167ee1ba5..834e0631ac 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -18,11 +18,7 @@ use data_types::{ }; use datafusion::error::DataFusionError; use futures::{stream::FuturesUnordered, TryStreamExt}; -use generated_types::{ - influxdata::iox::ingester::v1::GetWriteInfoResponse, - ingester::{encode_proto_predicate_as_base64, IngesterQueryRequest}, - write_info::merge_responses, -}; +use generated_types::ingester::{encode_proto_predicate_as_base64, IngesterQueryRequest}; use influxdb_iox_client::flight::generated_types::IngesterQueryResponseMetadata; use iox_query::{ exec::{stringset::StringSet, IOxSessionContext}, @@ -223,10 +219,6 @@ pub trait IngesterConnection: std::fmt::Debug + Send + Sync + 'static { span: Option, ) -> Result>; - /// Returns the most recent partition status info across all ingester(s) for the specified - /// write token. - async fn get_write_info(&self, write_token: &str) -> Result; - /// Return backend as [`Any`] which can be used to downcast to a specific implementation. fn as_any(&self) -> &dyn Any; } @@ -990,41 +982,11 @@ impl IngesterConnection for IngesterConnectionImpl { Ok(ingester_partitions) } - async fn get_write_info(&self, write_token: &str) -> Result { - let responses = self - .unique_ingester_addresses - .iter() - .map(|ingester_address| execute_get_write_infos(ingester_address, write_token)) - .collect::>() - .try_collect::>() - .await?; - - Ok(merge_responses(responses)) - } - fn as_any(&self) -> &dyn Any { self as &dyn Any } } -async fn execute_get_write_infos( - ingester_address: &str, - write_token: &str, -) -> Result { - let connection = connection::Builder::new() - .build(ingester_address) - .await - .context(ConnectingSnafu { ingester_address })?; - - influxdb_iox_client::write_info::Client::new(connection) - .get_write_info(write_token) - .await - .context(WriteInfoSnafu { - ingester_address, - write_token, - }) -} - /// A wrapper around the unpersisted data in a partition returned by /// the ingester that (will) implement the `QueryChunk` interface /// diff --git a/querier/src/ingester/test_util.rs b/querier/src/ingester/test_util.rs index 7c4acd2a20..0d38689b1d 100644 --- a/querier/src/ingester/test_util.rs +++ b/querier/src/ingester/test_util.rs @@ -4,7 +4,6 @@ use super::IngesterConnection; use async_trait::async_trait; use data_types::NamespaceId; use data_types::ShardIndex; -use generated_types::influxdata::iox::ingester::v1::GetWriteInfoResponse; use iox_query::util::create_basic_summary; use parking_lot::Mutex; use schema::Projection; @@ -116,10 +115,6 @@ impl IngesterConnection for MockIngesterConnection { Ok(partitions) } - async fn get_write_info(&self, _write_token: &str) -> super::Result { - unimplemented!() - } - fn as_any(&self) -> &dyn Any { self as &dyn Any } diff --git a/router/Cargo.toml b/router/Cargo.toml index 14437c8ec5..650937cfea 100644 --- a/router/Cargo.toml +++ b/router/Cargo.toml @@ -46,7 +46,6 @@ tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] } tonic = "0.8" trace = { path = "../trace/" } workspace-hack = { version = "0.1", path = "../workspace-hack" } -write_summary = { path = "../write_summary" } [dev-dependencies] assert_matches = "1.5" diff --git a/router/src/dml_handlers/fan_out.rs b/router/src/dml_handlers/fan_out.rs index 0c4ddd22bb..dd501a02f8 100644 --- a/router/src/dml_handlers/fan_out.rs +++ b/router/src/dml_handlers/fan_out.rs @@ -40,7 +40,7 @@ where U: Iterator + Send + Sync, { type WriteInput = I; - type WriteOutput = Vec; + type WriteOutput = (); type WriteError = T::WriteError; type DeleteError = T::DeleteError; @@ -54,7 +54,7 @@ where input: Self::WriteInput, span_ctx: Option, ) -> Result { - let results = input + input .into_iter() .map(|v| { let namespace = namespace.clone(); @@ -68,7 +68,7 @@ where .collect::>() .try_collect::>() .await?; - Ok(results) + Ok(()) } /// Pass the delete through to the inner handler. diff --git a/router/src/dml_handlers/instrumentation.rs b/router/src/dml_handlers/instrumentation.rs index be513d094a..ff93ecc9ab 100644 --- a/router/src/dml_handlers/instrumentation.rs +++ b/router/src/dml_handlers/instrumentation.rs @@ -147,7 +147,6 @@ mod tests { use data_types::TimestampRange; use metric::Attributes; use trace::{span::SpanStatus, RingBufferTraceCollector, TraceCollector}; - use write_summary::WriteSummary; use super::*; use crate::dml_handlers::{mock::MockDmlHandler, DmlError}; @@ -191,14 +190,10 @@ mod tests { ); } - fn summary() -> WriteSummary { - WriteSummary::default() - } - #[tokio::test] async fn test_write_ok() { let ns = "platanos".try_into().unwrap(); - let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())])); + let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())])); let metrics = Arc::new(metric::Registry::default()); let traces: Arc = Arc::new(RingBufferTraceCollector::new(5)); diff --git a/router/src/dml_handlers/mock.rs b/router/src/dml_handlers/mock.rs index e3f4c317ff..a2856e0a46 100644 --- a/router/src/dml_handlers/mock.rs +++ b/router/src/dml_handlers/mock.rs @@ -4,7 +4,6 @@ use async_trait::async_trait; use data_types::{DeletePredicate, NamespaceId, NamespaceName}; use parking_lot::Mutex; use trace::ctx::SpanContext; -use write_summary::WriteSummary; use super::{DmlError, DmlHandler}; @@ -28,7 +27,7 @@ pub enum MockDmlHandlerCall { #[derive(Debug)] struct Inner { calls: Vec>, - write_return: VecDeque>, + write_return: VecDeque>, delete_return: VecDeque>, } @@ -61,10 +60,7 @@ impl MockDmlHandler where W: Clone, { - pub fn with_write_return( - self, - ret: impl Into>>, - ) -> Self { + pub fn with_write_return(self, ret: impl Into>>) -> Self { self.0.lock().write_return = ret.into(); self } @@ -99,7 +95,7 @@ where type WriteError = DmlError; type DeleteError = DmlError; type WriteInput = W; - type WriteOutput = WriteSummary; + type WriteOutput = (); async fn write( &self, diff --git a/router/src/dml_handlers/mod.rs b/router/src/dml_handlers/mod.rs index a8a0345f09..acbccb6cb9 100644 --- a/router/src/dml_handlers/mod.rs +++ b/router/src/dml_handlers/mod.rs @@ -68,8 +68,5 @@ pub use fan_out::*; mod rpc_write; pub use rpc_write::*; -mod write_summary; -pub use self::write_summary::*; - #[cfg(test)] pub mod mock; diff --git a/router/src/dml_handlers/write_summary.rs b/router/src/dml_handlers/write_summary.rs deleted file mode 100644 index bbf9d0be16..0000000000 --- a/router/src/dml_handlers/write_summary.rs +++ /dev/null @@ -1,67 +0,0 @@ -use std::fmt::Debug; - -use async_trait::async_trait; -use data_types::{DeletePredicate, NamespaceId, NamespaceName}; -use dml::DmlMeta; -use trace::ctx::SpanContext; -use write_summary::WriteSummary; - -use super::DmlHandler; - -/// A [`WriteSummaryAdapter`] wraps DML Handler that produces -/// `Vec>` for each write, and produces a WriteSummary, -/// suitable for -/// sending back to a client -#[derive(Debug, Default)] -pub struct WriteSummaryAdapter { - inner: T, -} - -impl WriteSummaryAdapter { - /// Construct a [`WriteSummaryAdapter`] that passes DML operations to `inner` - /// concurrently. - pub fn new(inner: T) -> Self { - Self { inner } - } -} - -#[async_trait] -impl DmlHandler for WriteSummaryAdapter -where - T: DmlHandler>>, -{ - type WriteInput = T::WriteInput; - type WriteOutput = WriteSummary; - type WriteError = T::WriteError; - type DeleteError = T::DeleteError; - - /// Sends `input` to the inner handler, which returns a - /// `Vec>`, creating a `WriteSummary` - async fn write( - &self, - namespace: &NamespaceName<'static>, - namespace_id: NamespaceId, - input: Self::WriteInput, - span_ctx: Option, - ) -> Result { - let metas = self - .inner - .write(namespace, namespace_id, input, span_ctx) - .await?; - Ok(WriteSummary::new(metas)) - } - - /// Pass the delete through to the inner handler. - async fn delete( - &self, - namespace: &NamespaceName<'static>, - namespace_id: NamespaceId, - table_name: &str, - predicate: &DeletePredicate, - span_ctx: Option, - ) -> Result<(), Self::DeleteError> { - self.inner - .delete(namespace, namespace_id, table_name, predicate, span_ctx) - .await - } -} diff --git a/router/src/server/http.rs b/router/src/server/http.rs index 185006a4e0..928c4b52a7 100644 --- a/router/src/server/http.rs +++ b/router/src/server/http.rs @@ -19,7 +19,6 @@ use std::{str::Utf8Error, sync::Arc, time::Instant}; use thiserror::Error; use tokio::sync::{Semaphore, TryAcquireError}; use trace::ctx::SpanContext; -use write_summary::WriteSummary; use self::{ delete_predicate::parse_http_delete_request, @@ -35,8 +34,6 @@ use crate::{ namespace_resolver::NamespaceResolver, }; -const WRITE_TOKEN_HTTP_HEADER: &str = "X-IOx-Write-Token"; - /// Errors returned by the `router` HTTP request handler. #[derive(Debug, Error)] pub enum Error { @@ -312,7 +309,7 @@ impl HttpDelegate { impl HttpDelegate where - D: DmlHandler, WriteOutput = WriteSummary>, + D: DmlHandler, WriteOutput = ()>, N: NamespaceResolver, T: TimeProvider, { @@ -349,10 +346,9 @@ where (&Method::POST, "/api/v2/delete") => self.delete_handler(req).await, _ => return Err(Error::NoHandler), } - .map(|summary| { + .map(|_summary| { Response::builder() .status(StatusCode::NO_CONTENT) - .header(WRITE_TOKEN_HTTP_HEADER, summary.to_token()) .body(Body::empty()) .unwrap() }) @@ -362,7 +358,7 @@ where &self, req: Request, write_info: WriteParams, - ) -> Result { + ) -> Result<(), Error> { let span_ctx: Option = req.extensions().get().cloned(); let token = req @@ -406,7 +402,7 @@ where Ok(v) => v, Err(mutable_batch_lp::Error::EmptyPayload) => { debug!("nothing to write"); - return Ok(WriteSummary::default()); + return Ok(()); } Err(e) => return Err(Error::ParseLineProtocol(e)), }; @@ -431,8 +427,7 @@ where .get_namespace_id(&write_info.namespace) .await?; - let summary = self - .dml_handler + self.dml_handler .write(&write_info.namespace, namespace_id, batches, span_ctx) .await .map_err(Into::into)?; @@ -442,10 +437,10 @@ where self.write_metric_tables.inc(num_tables as _); self.write_metric_body_size.inc(body.len() as _); - Ok(summary) + Ok(()) } - async fn delete_handler(&self, req: Request) -> Result { + async fn delete_handler(&self, req: Request) -> Result<(), Error> { let span_ctx: Option = req.extensions().get().cloned(); let write_info = self.write_param_extractor.parse_v2(&req)?; @@ -491,9 +486,7 @@ where self.delete_metric_body_size.inc(body.len() as _); - // TODO pass back write summaries for deletes as well - // https://github.com/influxdata/influxdb_iox/issues/4209 - Ok(WriteSummary::default()) + Ok(()) } /// Parse the request's body into raw bytes, applying the configured size @@ -589,10 +582,6 @@ mod tests { const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); static NAMESPACE_NAME: &str = "bananas_test"; - fn summary() -> WriteSummary { - WriteSummary::default() - } - fn assert_metric_hit(metrics: &metric::Registry, name: &'static str, value: Option) { let counter = metrics .get_instrument::>(name) @@ -793,7 +782,7 @@ mod tests { ok, query_string = "?org=bananas&bucket=test", body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), - dml_handler = [Ok(summary())], + dml_handler = [Ok(())], want_result = Ok(_), want_dml_calls = [MockDmlHandlerCall::Write{namespace, ..}] => { assert_eq!(namespace, NAMESPACE_NAME); @@ -804,7 +793,7 @@ mod tests { ok_precision_s, query_string = "?org=bananas&bucket=test&precision=s", body = "platanos,tag1=A,tag2=B val=42i 1647622847".as_bytes(), - dml_handler = [Ok(summary())], + dml_handler = [Ok(())], want_result = Ok(_), want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => { assert_eq!(namespace, NAMESPACE_NAME); @@ -820,7 +809,7 @@ mod tests { ok_precision_ms, query_string = "?org=bananas&bucket=test&precision=ms", body = "platanos,tag1=A,tag2=B val=42i 1647622847000".as_bytes(), - dml_handler = [Ok(summary())], + dml_handler = [Ok(())], want_result = Ok(_), want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => { assert_eq!(namespace, NAMESPACE_NAME); @@ -836,7 +825,7 @@ mod tests { ok_precision_us, query_string = "?org=bananas&bucket=test&precision=us", body = "platanos,tag1=A,tag2=B val=42i 1647622847000000".as_bytes(), - dml_handler = [Ok(summary())], + dml_handler = [Ok(())], want_result = Ok(_), want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => { assert_eq!(namespace, NAMESPACE_NAME); @@ -852,7 +841,7 @@ mod tests { ok_precision_ns, query_string = "?org=bananas&bucket=test&precision=ns", body = "platanos,tag1=A,tag2=B val=42i 1647622847000000000".as_bytes(), - dml_handler = [Ok(summary())], + dml_handler = [Ok(())], want_result = Ok(_), want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => { assert_eq!(namespace, NAMESPACE_NAME); @@ -869,7 +858,7 @@ mod tests { // SECONDS, so multiplies the provided timestamp by 1,000,000,000 query_string = "?org=bananas&bucket=test&precision=s", body = "platanos,tag1=A,tag2=B val=42i 1647622847000000000".as_bytes(), - dml_handler = [Ok(summary())], + dml_handler = [Ok(())], want_result = Err(Error::ParseLineProtocol(_)), want_dml_calls = [] ); @@ -878,7 +867,7 @@ mod tests { no_query_params, query_string = "", body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), - dml_handler = [Ok(summary())], + dml_handler = [Ok(())], want_result = Err(Error::MultiTenantError( MultiTenantExtractError::ParseV2Request(V2WriteParseError::NoQueryParams) )), @@ -889,7 +878,7 @@ mod tests { no_org_bucket, query_string = "?", body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), - dml_handler = [Ok(summary())], + dml_handler = [Ok(())], want_result = Err(Error::MultiTenantError( MultiTenantExtractError::InvalidOrgAndBucket( OrgBucketMappingError::NoOrgBucketSpecified @@ -902,7 +891,7 @@ mod tests { empty_org_bucket, query_string = "?org=&bucket=", body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), - dml_handler = [Ok(summary())], + dml_handler = [Ok(())], want_result = Err(Error::MultiTenantError( MultiTenantExtractError::InvalidOrgAndBucket( OrgBucketMappingError::NoOrgBucketSpecified @@ -915,7 +904,7 @@ mod tests { invalid_org_bucket, query_string = format!("?org=test&bucket={}", "A".repeat(1000)), body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), - dml_handler = [Ok(summary())], + dml_handler = [Ok(())], want_result = Err(Error::MultiTenantError( MultiTenantExtractError::InvalidOrgAndBucket( OrgBucketMappingError::InvalidNamespaceName( @@ -930,7 +919,7 @@ mod tests { invalid_line_protocol, query_string = "?org=bananas&bucket=test", body = "not line protocol".as_bytes(), - dml_handler = [Ok(summary())], + dml_handler = [Ok(())], want_result = Err(Error::ParseLineProtocol(_)), want_dml_calls = [] // None ); @@ -939,7 +928,7 @@ mod tests { non_utf8_body, query_string = "?org=bananas&bucket=test", body = vec![0xc3, 0x28], - dml_handler = [Ok(summary())], + dml_handler = [Ok(())], want_result = Err(Error::NonUtf8Body(_)), want_dml_calls = [] // None ); @@ -967,7 +956,7 @@ mod tests { .flat_map(|s| s.bytes()) .collect::>() }, - dml_handler = [Ok(summary())], + dml_handler = [Ok(())], want_result = Err(Error::RequestSizeExceeded(_)), want_dml_calls = [] // None ); @@ -998,7 +987,7 @@ mod tests { field_upsert_within_batch, query_string = "?org=bananas&bucket=test", body = "test field=1u 100\ntest field=2u 100".as_bytes(), - dml_handler = [Ok(summary())], + dml_handler = [Ok(())], want_result = Ok(_), want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => { assert_eq!(namespace, NAMESPACE_NAME); @@ -1151,7 +1140,7 @@ mod tests { duplicate_fields_same_value, query_string = "?org=bananas&bucket=test", body = "whydo InputPower=300i,InputPower=300i".as_bytes(), - dml_handler = [Ok(summary())], + dml_handler = [Ok(())], want_result = Ok(_), want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input, ..}] => { assert_eq!(namespace, NAMESPACE_NAME); @@ -1168,7 +1157,7 @@ mod tests { duplicate_fields_different_value, query_string = "?org=bananas&bucket=test", body = "whydo InputPower=300i,InputPower=42i".as_bytes(), - dml_handler = [Ok(summary())], + dml_handler = [Ok(())], want_result = Ok(_), want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input, ..}] => { assert_eq!(namespace, NAMESPACE_NAME); @@ -1423,7 +1412,7 @@ mod tests { let dml_handler = Arc::new( MockDmlHandler::default() - .with_write_return([Ok(summary())]) + .with_write_return([Ok(())]) .with_delete_return([]), ); let metrics = Arc::new(metric::Registry::default()); @@ -1505,7 +1494,7 @@ mod tests { let dml_handler = Arc::new( MockDmlHandler::default() - .with_write_return([Ok(summary())]) + .with_write_return([Ok(())]) .with_delete_return([]), ); let metrics = Arc::new(metric::Registry::default()); @@ -1554,7 +1543,7 @@ mod tests { let dml_handler = Arc::new( MockDmlHandler::default() - .with_write_return([Ok(summary()), Ok(summary()), Ok(summary())]) + .with_write_return([Ok(()), Ok(()), Ok(())]) .with_delete_return([]), ); let metrics = Arc::new(metric::Registry::default()); diff --git a/router/tests/common/mod.rs b/router/tests/common/mod.rs index 0f5f642c52..169840a3d4 100644 --- a/router/tests/common/mod.rs +++ b/router/tests/common/mod.rs @@ -12,7 +12,7 @@ use router::{ dml_handlers::{ client::mock::MockWriteClient, Chain, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, Partitioned, Partitioner, RetentionValidator, RpcWrite, - SchemaValidator, WriteSummaryAdapter, + SchemaValidator, }, namespace_cache::{MemoryNamespaceCache, ReadThroughCache, ShardedCache}, namespace_resolver::{MissingNamespaceAction, NamespaceAutocreation, NamespaceSchemaResolver}, @@ -127,11 +127,9 @@ type HttpDelegateStack = HttpDelegate< >, Partitioner, >, - WriteSummaryAdapter< - FanOutAdaptor< - RpcWrite>, - Vec>>, - >, + FanOutAdaptor< + RpcWrite>, + Vec>>, >, >, >, @@ -180,7 +178,7 @@ impl TestContext { namespace_autocreation, ); - let parallel_write = WriteSummaryAdapter::new(FanOutAdaptor::new(rpc_writer)); + let parallel_write = FanOutAdaptor::new(rpc_writer); let handler_stack = retention_validator .and_then(schema_validator) diff --git a/service_grpc_influxrpc/src/id.rs b/service_grpc_influxrpc/src/id.rs index 17d8b3ac81..e03ce15e1e 100644 --- a/service_grpc_influxrpc/src/id.rs +++ b/service_grpc_influxrpc/src/id.rs @@ -97,7 +97,6 @@ impl From for String { #[cfg(test)] mod tests { use super::*; - use serde::Deserialize; use std::convert::TryInto; #[test] @@ -169,23 +168,4 @@ mod tests { assert_eq!(expected_output, actual_output); } } - - #[test] - fn test_deserialize_then_to_string() { - let i: Id = "0000111100001111".parse().unwrap(); - assert_eq!(Id(NonZeroU64::new(18_764_712_120_593).unwrap()), i); - - #[derive(Deserialize)] - struct WriteInfo { - org: Id, - } - - let query = "org=0000111100001111"; - let write_info: WriteInfo = serde_urlencoded::from_str(query).unwrap(); - assert_eq!( - Id(NonZeroU64::new(18_764_712_120_593).unwrap()), - write_info.org - ); - assert_eq!("0000111100001111", write_info.org.to_string()); - } } diff --git a/write_summary/Cargo.toml b/write_summary/Cargo.toml deleted file mode 100644 index 9f90d8d80c..0000000000 --- a/write_summary/Cargo.toml +++ /dev/null @@ -1,19 +0,0 @@ -[package] -name = "write_summary" -version.workspace = true -authors.workspace = true -edition.workspace = true -license.workspace = true - -[dependencies] -base64 = "0.21" -data_types = { path = "../data_types" } -dml = { path = "../dml" } -generated_types = { path = "../generated_types" } -observability_deps = { path = "../observability_deps" } -serde_json = "1.0.95" -snafu = "0.7" -workspace-hack = { version = "0.1", path = "../workspace-hack" } - -[dev-dependencies] -iox_time = { path = "../iox_time" } diff --git a/write_summary/src/lib.rs b/write_summary/src/lib.rs deleted file mode 100644 index 52376a57fb..0000000000 --- a/write_summary/src/lib.rs +++ /dev/null @@ -1,445 +0,0 @@ -use base64::{prelude::BASE64_STANDARD, Engine}; -use data_types::{SequenceNumber, ShardIndex, ShardWriteStatus}; -use dml::DmlMeta; -/// Protobuf to/from conversion -use generated_types::influxdata::iox::write_summary::v1 as proto; -use observability_deps::tracing::debug; -use snafu::{OptionExt, Snafu}; -use std::collections::BTreeMap; - -mod progress; -pub use progress::ShardProgress; - -#[derive(Debug, Snafu, PartialEq, Eq)] -pub enum Error { - #[snafu(display("Unknown shard index: {}", shard_index))] - UnknownShard { shard_index: ShardIndex }, -} - -pub type Result = std::result::Result; - -/// Contains information about a single write. -/// -/// A single write consisting of multiple lines of line protocol -/// formatted data are shared and partitioned across potentially -/// several shards which are then processed by the ingester to -/// become readable at potentially different times. -/// -/// This struct contains sufficient information to determine the -/// current state of the write as a whole -#[derive(Debug, Default, Clone, PartialEq, Eq)] -/// Summary of a `Vec>` -pub struct WriteSummary { - /// Key is the shard index from the `DmlMeta` structure (aka kafka - /// partition id), value is the sequence numbers from that - /// shard. - /// - /// Note: `BTreeMap` to ensure the output is in a consistent order - shards: BTreeMap>, -} - -impl WriteSummary { - pub fn new(metas: Vec>) -> Self { - debug!(?metas, "Creating write summary"); - let sequences = metas - .iter() - .flat_map(|v| v.iter()) - .filter_map(|meta| meta.sequence()); - - let mut shards = BTreeMap::new(); - for s in sequences { - let shard_index = s.shard_index; - let sequence_number = s.sequence_number; - - shards - .entry(shard_index) - .or_insert_with(Vec::new) - .push(sequence_number) - } - - Self { shards } - } - - /// Return an opaque summary "token" of this summary - pub fn to_token(self) -> String { - let proto_write_summary: proto::WriteSummary = self.into(); - BASE64_STANDARD.encode( - serde_json::to_string(&proto_write_summary) - .expect("unexpected error serializing token to json"), - ) - } - - /// Return a WriteSummary from the "token" (created with [Self::to_token]), or error if not possible - pub fn try_from_token(token: &str) -> Result { - let data = BASE64_STANDARD - .decode(token) - .map_err(|e| format!("Invalid write token, invalid base64: {e}"))?; - - let json = String::from_utf8(data) - .map_err(|e| format!("Invalid write token, non utf8 data in write token: {e}"))?; - - let proto = serde_json::from_str::(&json) - .map_err(|e| format!("Invalid write token, protobuf decode error: {e}"))?; - - proto - .try_into() - .map_err(|e| format!("Invalid write token, invalid content: {e}")) - } - - /// return what shard indexes from the write buffer were present in this write summary - pub fn shard_indexes(&self) -> Vec { - self.shards.keys().cloned().collect() - } - - /// Given the write described by this summary, and the shard's progress for a particular - /// shard index, returns the status of that write in this write summary - pub fn write_status( - &self, - shard_index: ShardIndex, - progress: &ShardProgress, - ) -> Result { - let sequence_numbers = self - .shards - .get(&shard_index) - .context(UnknownShardSnafu { shard_index })?; - - debug!(?shard_index, ?progress, ?sequence_numbers, "write_status"); - - if progress.is_empty() { - return Ok(ShardWriteStatus::ShardUnknown); - } - - let is_persisted = sequence_numbers - .iter() - .all(|sequence_number| progress.persisted(*sequence_number)); - - if is_persisted { - return Ok(ShardWriteStatus::Persisted); - } - - let is_readable = sequence_numbers - .iter() - .all(|sequence_number| progress.readable(*sequence_number)); - - if is_readable { - return Ok(ShardWriteStatus::Readable); - } - - Ok(ShardWriteStatus::Durable) - } -} - -impl From for proto::WriteSummary { - fn from(summary: WriteSummary) -> Self { - let shards = summary - .shards - .into_iter() - .map(|(shard_index, sequence_numbers)| proto::ShardWrite { - shard_index: shard_index.get(), - sequence_numbers: sequence_numbers.into_iter().map(|v| v.get()).collect(), - }) - .collect(); - - Self { shards } - } -} - -impl TryFrom for WriteSummary { - type Error = String; - - fn try_from(summary: proto::WriteSummary) -> Result { - let shards = summary - .shards - .into_iter() - .map( - |proto::ShardWrite { - shard_index, - sequence_numbers, - }| { - let sequence_numbers = sequence_numbers - .into_iter() - .map(SequenceNumber::new) - .collect::>(); - - Ok((ShardIndex::new(shard_index), sequence_numbers)) - }, - ) - .collect::, String>>()?; - - Ok(Self { shards }) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use data_types::Sequence; - - #[test] - fn empty() { - let metas = vec![]; - let summary: proto::WriteSummary = WriteSummary::new(metas).into(); - - let expected = proto::WriteSummary { shards: vec![] }; - - assert_eq!(summary, expected); - } - - #[test] - fn one() { - let metas = vec![vec![make_meta(Sequence::new( - ShardIndex::new(1), - SequenceNumber::new(2), - ))]]; - let summary: proto::WriteSummary = WriteSummary::new(metas).into(); - - let expected = proto::WriteSummary { - shards: vec![proto::ShardWrite { - shard_index: 1, - sequence_numbers: vec![2], - }], - }; - - assert_eq!(summary, expected); - } - - #[test] - fn many() { - let metas = vec![ - vec![ - make_meta(Sequence::new(ShardIndex::new(1), SequenceNumber::new(2))), - make_meta(Sequence::new(ShardIndex::new(10), SequenceNumber::new(20))), - ], - vec![make_meta(Sequence::new( - ShardIndex::new(1), - SequenceNumber::new(3), - ))], - ]; - let summary: proto::WriteSummary = WriteSummary::new(metas).into(); - - let expected = proto::WriteSummary { - shards: vec![ - proto::ShardWrite { - shard_index: 1, - sequence_numbers: vec![2, 3], - }, - proto::ShardWrite { - shard_index: 10, - sequence_numbers: vec![20], - }, - ], - }; - - assert_eq!(summary, expected); - } - - #[test] - fn different_order() { - // order in sequences shouldn't matter - let metas1 = vec![vec![ - make_meta(Sequence::new(ShardIndex::new(1), SequenceNumber::new(2))), - make_meta(Sequence::new(ShardIndex::new(2), SequenceNumber::new(3))), - ]]; - - // order in sequences shouldn't matter - let metas2 = vec![vec![ - make_meta(Sequence::new(ShardIndex::new(2), SequenceNumber::new(3))), - make_meta(Sequence::new(ShardIndex::new(1), SequenceNumber::new(2))), - ]]; - - let summary1: proto::WriteSummary = WriteSummary::new(metas1).into(); - let summary2: proto::WriteSummary = WriteSummary::new(metas2).into(); - - let expected = proto::WriteSummary { - shards: vec![ - proto::ShardWrite { - shard_index: 1, - sequence_numbers: vec![2], - }, - proto::ShardWrite { - shard_index: 2, - sequence_numbers: vec![3], - }, - ], - }; - - assert_eq!(summary1, expected); - assert_eq!(summary2, expected); - } - - #[test] - fn token_creation() { - let metas = vec![vec![make_meta(Sequence::new( - ShardIndex::new(1), - SequenceNumber::new(2), - ))]]; - let summary = WriteSummary::new(metas.clone()); - let summary_copy = WriteSummary::new(metas); - - let metas2 = vec![vec![make_meta(Sequence::new( - ShardIndex::new(2), - SequenceNumber::new(3), - ))]]; - let summary2 = WriteSummary::new(metas2); - - let token = summary.to_token(); - - // non empty - assert!(!token.is_empty()); - - // same when created with same metas - assert_eq!(token, summary_copy.to_token()); - - // different when created with different metas - assert_ne!(token, summary2.to_token()); - - assert!( - !token.contains("sequenceNumbers"), - "token not obscured: {token}" - ); - assert!(!token.contains("shards"), "token not obscured: {token}"); - } - - #[test] - fn token_parsing() { - let metas = vec![vec![make_meta(Sequence::new( - ShardIndex::new(1), - SequenceNumber::new(2), - ))]]; - let summary = WriteSummary::new(metas); - - let token = summary.clone().to_token(); - - // round trip should parse to the same summary - let new_summary = WriteSummary::try_from_token(&token).expect("parsing successful"); - assert_eq!(summary, new_summary); - } - - #[test] - #[should_panic(expected = "Invalid write token, invalid base64")] - fn token_parsing_bad_base64() { - let token = "foo%%"; - WriteSummary::try_from_token(token).unwrap(); - } - - #[test] - #[should_panic(expected = "Invalid write token, non utf8 data in write token")] - fn token_parsing_bad_utf8() { - let token = BASE64_STANDARD.encode(vec![0xa0, 0xa1]); - WriteSummary::try_from_token(&token).unwrap(); - } - - #[test] - #[should_panic(expected = "Invalid write token, protobuf decode error: key must be a string")] - fn token_parsing_bad_proto() { - let token = BASE64_STANDARD.encode("{not_valid_json}"); - WriteSummary::try_from_token(&token).unwrap(); - } - - #[test] - fn no_progress() { - let summary = test_summary(); - - // if we have no info about this shard in the progress - let shard_index = ShardIndex::new(1); - let progress = ShardProgress::new(); - assert_eq!( - summary.write_status(shard_index, &progress), - Ok(ShardWriteStatus::ShardUnknown) - ); - } - - #[test] - fn unknown_shard() { - let summary = test_summary(); - // No information on shard index 3 - let shard_index = ShardIndex::new(3); - let progress = ShardProgress::new().with_buffered(SequenceNumber::new(2)); - let err = summary.write_status(shard_index, &progress).unwrap_err(); - assert_eq!(err.to_string(), "Unknown shard index: 3"); - } - - #[test] - fn readable() { - let summary = test_summary(); - - // shard index 1 made it to sequence number 3 - let shard_index = ShardIndex::new(1); - let progress = ShardProgress::new().with_buffered(SequenceNumber::new(3)); - assert_eq!( - summary.write_status(shard_index, &progress), - Ok(ShardWriteStatus::Readable) - ); - - // if shard index 1 only made it to sequence number 2, but write includes sequence number 3 - let shard_index = ShardIndex::new(1); - let progress = ShardProgress::new().with_buffered(SequenceNumber::new(2)); - assert_eq!( - summary.write_status(shard_index, &progress), - Ok(ShardWriteStatus::Durable) - ); - - // shard index 2 made it to sequence number 2 - let shard_index = ShardIndex::new(2); - let progress = ShardProgress::new().with_buffered(SequenceNumber::new(2)); - - assert_eq!( - summary.write_status(shard_index, &progress), - Ok(ShardWriteStatus::Readable) - ); - } - - #[test] - fn persisted() { - let summary = test_summary(); - - // shard index 1 has persisted up to sequence number 3 - let shard_index = ShardIndex::new(1); - let progress = ShardProgress::new().with_persisted(SequenceNumber::new(3)); - assert_eq!( - summary.write_status(shard_index, &progress), - Ok(ShardWriteStatus::Persisted) - ); - - // shard index 2 has persisted up to sequence number 2 - let shard_index = ShardIndex::new(2); - let progress = ShardProgress::new().with_persisted(SequenceNumber::new(2)); - assert_eq!( - summary.write_status(shard_index, &progress), - Ok(ShardWriteStatus::Persisted) - ); - - // shard index 1 only persisted up to sequence number 2, have buffered data at sequence - // number 3 - let shard_index = ShardIndex::new(1); - let progress = ShardProgress::new() - .with_buffered(SequenceNumber::new(3)) - .with_persisted(SequenceNumber::new(2)); - - assert_eq!( - summary.write_status(shard_index, &progress), - Ok(ShardWriteStatus::Readable) - ); - } - - /// Return a write summary that describes a write with: - /// shard 1 --> sequence 3 - /// shard 2 --> sequence 1 - fn test_summary() -> WriteSummary { - let metas = vec![vec![ - make_meta(Sequence::new(ShardIndex::new(1), SequenceNumber::new(2))), - make_meta(Sequence::new(ShardIndex::new(1), SequenceNumber::new(3))), - make_meta(Sequence::new(ShardIndex::new(2), SequenceNumber::new(1))), - ]]; - WriteSummary::new(metas) - } - - fn make_meta(s: Sequence) -> DmlMeta { - use iox_time::TimeProvider; - let time_provider = iox_time::SystemProvider::new(); - - let span_context = None; - let bytes_read = 132; - DmlMeta::sequenced(s, time_provider.now(), span_context, bytes_read) - } -} diff --git a/write_summary/src/progress.rs b/write_summary/src/progress.rs deleted file mode 100644 index 2f453c1ae5..0000000000 --- a/write_summary/src/progress.rs +++ /dev/null @@ -1,399 +0,0 @@ -use data_types::SequenceNumber; - -/// Information on how much data a particular shard has processed -/// -/// ```text -/// Write Lifecycle (compaction not shown): -/// -/// Durable --------------> Readable -------------> Persisted -/// -/// in shard, in memory, not yet in parquet -/// not readable. in parquet -/// ``` -/// -/// Note: min_readable_sequence_number <= min_totally_persisted_sequence_number -#[derive(Clone, Debug, PartialEq, Eq, Default)] -pub struct ShardProgress { - /// Smallest sequence number of data that is buffered in memory - min_buffered: Option, - - /// Largest sequence number of data that is buffered in memory - max_buffered: Option, - - /// Largest sequence number of data that has been written to parquet - max_persisted: Option, - - /// The sequence number that is actively buffering, if any. The - /// actively buffering sequence number is not yet completely - /// buffered to all partitions, and thus is excluded from the - /// min/max buffered calculation. - actively_buffering: Option, -} - -impl ShardProgress { - pub fn new() -> Self { - Default::default() - } - - /// Note that `sequence_number` is buffered - pub fn with_buffered(mut self, sequence_number: SequenceNumber) -> Self { - // clamp if there is any actively buffering operation - let sequence_number = if let Some(v) = clamp(sequence_number, self.actively_buffering) { - v - } else { - return self; - }; - - self.min_buffered = Some( - self.min_buffered - .take() - .map(|cur| cur.min(sequence_number)) - .unwrap_or(sequence_number), - ); - self.max_buffered = Some( - self.max_buffered - .take() - .map(|cur| cur.max(sequence_number)) - .unwrap_or(sequence_number), - ); - self - } - - /// Note that the specified sequence number is still actively - /// buffering, and adjusting all subsequent sequence numbers - /// accordingly. - pub fn actively_buffering(mut self, sequence_number: Option) -> Self { - self.actively_buffering = sequence_number; - self - } - - /// Note that data with `sequence_number` was persisted; Note this does not - /// mean that all sequence numbers less than `sequence_number` - /// have been persisted. - pub fn with_persisted(mut self, sequence_number: SequenceNumber) -> Self { - self.max_persisted = Some( - self.max_persisted - .take() - .map(|cur| cur.max(sequence_number)) - .unwrap_or(sequence_number), - ); - self - } - - /// Return true if this shard progress has no information on - /// shard progress, false otherwise - pub fn is_empty(&self) -> bool { - self.min_buffered.is_none() && self.max_buffered.is_none() && self.max_persisted.is_none() - } - - // return true if this sequence number is readable - pub fn readable(&self, sequence_number: SequenceNumber) -> bool { - match (&self.max_buffered, &self.max_persisted) { - (Some(max_buffered), Some(max_persisted)) => { - &sequence_number <= max_buffered || &sequence_number <= max_persisted - } - (None, Some(max_persisted)) => &sequence_number <= max_persisted, - (Some(max_buffered), _) => &sequence_number <= max_buffered, - (None, None) => { - false // data not yet ingested - } - } - } - - // return true if this sequence number is persisted - pub fn persisted(&self, sequence_number: SequenceNumber) -> bool { - // with both buffered and persisted data, need to - // ensure that no data is buffered to know that all is - // persisted - match (&self.min_buffered, &self.max_persisted) { - (Some(min_buffered), Some(max_persisted)) => { - // with both buffered and persisted data, need to - // ensure that no data is buffered to know that all is - // persisted - &sequence_number < min_buffered && &sequence_number <= max_persisted - } - (None, Some(max_persisted)) => &sequence_number <= max_persisted, - (_, None) => { - false // data not yet persisted - } - } - } - - /// Combine the values from other - pub fn combine(self, other: Self) -> Self { - let updated = if let Some(min_buffered) = other.min_buffered { - self.with_buffered(min_buffered) - } else { - self - }; - - let updated = if let Some(max_buffered) = other.max_buffered { - updated.with_buffered(max_buffered) - } else { - updated - }; - - if let Some(max_persisted) = other.max_persisted { - updated.with_persisted(max_persisted) - } else { - updated - } - } -} - -// Ensures that `buffered` is less than `actively_buffering`, -// returning the new buffered value -fn clamp( - buffered: SequenceNumber, - actively_buffering: Option, -) -> Option { - let completed_buffering = if let Some(val) = actively_buffering { - // returns `None` if no sequence number has completed buffering yet - let min_sequence = (val.get() as u64).checked_sub(1)?; - SequenceNumber::new(min_sequence as i64) - } else { - return Some(buffered); - }; - - if buffered > completed_buffering { - Some(completed_buffering) - } else { - // no adjustment needed - Some(buffered) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn empty() { - let progress = ShardProgress::new(); - let sequence_number = SequenceNumber::new(0); - assert!(!progress.readable(sequence_number)); - assert!(!progress.persisted(sequence_number)); - } - - #[test] - fn persisted() { - let lt = SequenceNumber::new(0); - let eq = SequenceNumber::new(1); - let gt = SequenceNumber::new(2); - - let progress = ShardProgress::new().with_persisted(eq); - - assert!(progress.readable(lt)); - assert!(progress.persisted(lt)); - - // persisted implies it is also readable - assert!(progress.readable(eq)); - assert!(progress.persisted(eq)); - - assert!(!progress.readable(gt)); - assert!(!progress.persisted(gt)); - } - - #[test] - fn buffered() { - let lt = SequenceNumber::new(0); - let eq = SequenceNumber::new(1); - let gt = SequenceNumber::new(2); - - let progress = ShardProgress::new().with_buffered(eq); - - assert!(progress.readable(lt)); - assert!(!progress.persisted(lt)); - - assert!(progress.readable(eq)); - assert!(!progress.persisted(eq)); - - assert!(!progress.readable(gt)); - assert!(!progress.persisted(gt)); - } - - #[test] - fn buffered_greater_than_persisted() { - let lt = SequenceNumber::new(0); - let eq = SequenceNumber::new(1); - let gt = SequenceNumber::new(2); - - let progress = ShardProgress::new().with_buffered(eq).with_persisted(lt); - - assert!(progress.readable(lt)); - assert!(progress.persisted(lt)); - - assert!(progress.readable(eq)); - assert!(!progress.persisted(eq)); - - assert!(!progress.readable(gt)); - assert!(!progress.persisted(gt)); - } - - #[test] - fn buffered_and_persisted() { - let lt = SequenceNumber::new(0); - let eq = SequenceNumber::new(1); - let gt = SequenceNumber::new(2); - - let progress = ShardProgress::new().with_buffered(eq).with_persisted(eq); - - assert!(progress.readable(lt)); - assert!(progress.persisted(lt)); - - assert!(progress.readable(eq)); - assert!(!progress.persisted(eq)); // have buffered data, so can't be persisted here - - assert!(!progress.readable(gt)); - assert!(!progress.persisted(gt)); - } - - #[test] - fn buffered_less_than_persisted() { - let lt = SequenceNumber::new(0); - let eq = SequenceNumber::new(1); - let gt = SequenceNumber::new(2); - - // data buffered between lt and eq - let progress = ShardProgress::new() - .with_buffered(lt) - .with_buffered(eq) - .with_persisted(eq); - - assert!(progress.readable(lt)); - assert!(!progress.persisted(lt)); // have buffered data at lt, can't be persisted - - assert!(progress.readable(eq)); - assert!(!progress.persisted(eq)); // have buffered data, so can't be persisted - - assert!(!progress.readable(gt)); - assert!(!progress.persisted(gt)); - } - - #[test] - fn combine() { - let lt = SequenceNumber::new(0); - let eq = SequenceNumber::new(1); - let gt = SequenceNumber::new(2); - - let progress1 = ShardProgress::new().with_buffered(gt); - - let progress2 = ShardProgress::new().with_buffered(lt).with_persisted(eq); - - let expected = ShardProgress::new() - .with_buffered(lt) - .with_buffered(gt) - .with_persisted(eq); - - assert_eq!(progress1.combine(progress2), expected); - } - - #[test] - fn actively_buffering() { - let num0 = SequenceNumber::new(0); - let num1 = SequenceNumber::new(1); - let num2 = SequenceNumber::new(2); - - #[derive(Debug)] - struct Expected { - min_buffered: Option, - max_buffered: Option, - } - - let cases = vec![ - // No buffering - ( - ShardProgress::new() - .actively_buffering(None) - .with_buffered(num1) - .with_buffered(num2), - Expected { - min_buffered: Some(num1), - max_buffered: Some(num2), - }, - ), - // actively buffering num2 - ( - ShardProgress::new() - .actively_buffering(Some(num2)) - .with_buffered(num1) - .with_buffered(num2), - Expected { - min_buffered: Some(num1), - max_buffered: Some(num1), - }, - ), - // actively buffering only one - ( - ShardProgress::new() - .actively_buffering(Some(num1)) - .with_buffered(num1), - Expected { - min_buffered: Some(num0), - max_buffered: Some(num0), - }, - ), - // actively buffering, haven't buffed any yet - ( - ShardProgress::new() - .actively_buffering(Some(num1)) - .with_buffered(num0), - Expected { - min_buffered: Some(num0), - max_buffered: Some(num0), - }, - ), - // actively buffering, haven't buffered any - ( - ShardProgress::new().actively_buffering(Some(num0)), - Expected { - min_buffered: None, - max_buffered: None, - }, - ), - // actively buffering partially buffered - ( - ShardProgress::new() - .actively_buffering(Some(num0)) - .with_buffered(num0), - Expected { - min_buffered: None, - max_buffered: None, - }, - ), - ]; - - for (progress, expected) in cases { - println!("Comparing {progress:?} to {expected:?}"); - assert_eq!( - progress.min_buffered, expected.min_buffered, - "min buffered mismatch" - ); - assert_eq!( - progress.max_buffered, expected.max_buffered, - "max buffered mismatch" - ); - assert_eq!(progress.max_persisted, None, "unexpected persisted"); - } - } - - #[test] - fn test_clamp() { - let num0 = SequenceNumber::new(0); - let num1 = SequenceNumber::new(1); - let num2 = SequenceNumber::new(2); - - assert_eq!(clamp(num0, None), Some(num0)); - - // num0 hasn't completed buffering yet - assert_eq!(clamp(num0, Some(num0)), None); - - assert_eq!(clamp(num1, Some(num0)), None); - assert_eq!(clamp(num1, Some(num1)), Some(num0)); - assert_eq!(clamp(num1, Some(num2)), Some(num1)); - - assert_eq!(clamp(num2, Some(num0)), None); - assert_eq!(clamp(num2, Some(num1)), Some(num0)); - assert_eq!(clamp(num2, Some(num2)), Some(num1)); - } -}