fix: Remove the write_summary crate and write info service

pull/24376/head
Carol (Nichols || Goulding) 2023-02-22 15:01:30 -05:00
parent ffe8714956
commit 6387a9576a
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
29 changed files with 42 additions and 1458 deletions

17
Cargo.lock generated
View File

@ -3233,7 +3233,6 @@ dependencies = [
"tokio-util",
"trace",
"workspace-hack",
"write_summary",
]
[[package]]
@ -4839,7 +4838,6 @@ dependencies = [
"tonic",
"trace",
"workspace-hack",
"write_summary",
]
[[package]]
@ -6866,21 +6864,6 @@ dependencies = [
"zstd-sys",
]
[[package]]
name = "write_summary"
version = "0.1.0"
dependencies = [
"base64 0.21.0",
"data_types",
"dml",
"generated_types",
"iox_time",
"observability_deps",
"serde_json",
"snafu",
"workspace-hack",
]
[[package]]
name = "xz2"
version = "0.1.7"

View File

@ -81,7 +81,6 @@ members = [
"trogging",
"wal",
"workspace-hack",
"write_summary",
]
default-members = ["influxdb_iox"]

View File

@ -2300,20 +2300,6 @@ impl TimestampMinMax {
}
}
/// Specifies the status of data in the ingestion process.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ShardWriteStatus {
/// Nothing is known about this write (e.g. it refers to a shard for which we have no
/// information)
ShardUnknown,
/// The data has not yet been processed by the ingester, and thus is unreadable
Durable,
/// The data is readable, but not yet persisted
Readable,
/// The data is both readable and persisted to parquet
Persisted,
}
#[cfg(test)]
mod tests {
use std::borrow::Cow;

View File

@ -47,7 +47,6 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
let sharder_path = root.join("influxdata/iox/sharder/v1");
let wal_path = root.join("influxdata/iox/wal/v1");
let write_buffer_path = root.join("influxdata/iox/write_buffer/v1");
let write_summary_path = root.join("influxdata/iox/write_summary/v1");
let storage_path = root.join("influxdata/platform/storage");
let storage_errors_path = root.join("influxdata/platform/errors");
@ -59,7 +58,6 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
delete_path.join("service.proto"),
ingester_path.join("parquet_metadata.proto"),
ingester_path.join("query.proto"),
ingester_path.join("write_info.proto"),
ingester_path.join("write.proto"),
ingester_path.join("replication.proto"),
ingester_path.join("persist.proto"),
@ -76,7 +74,6 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
sharder_path.join("sharder.proto"),
wal_path.join("wal.proto"),
write_buffer_path.join("write_buffer.proto"),
write_summary_path.join("write_summary.proto"),
storage_path.join("predicate.proto"),
storage_path.join("service.proto"),
storage_path.join("source.proto"),

View File

@ -1,57 +0,0 @@
syntax = "proto3";
package influxdata.iox.ingester.v1;
option go_package = "github.com/influxdata/iox/ingester/v1";
// NOTE: This is an ALPHA / Internal API that is used as part of the
// end to end tests.
//
// A public API is tracked here:
// <https://github.com/influxdata/influxdb_iox/issues/4354>
service WriteInfoService {
// Get information about a particular write
rpc GetWriteInfo(GetWriteInfoRequest) returns (GetWriteInfoResponse);
}
message GetWriteInfoRequest {
// The write token returned from a write that was written to one or
// more shards
string write_token = 1;
}
message GetWriteInfoResponse {
// Renamed from kafka_partition_infos to shard_infos
reserved 3;
reserved "kafka_partition_infos";
// Information for all shards in this write
repeated ShardInfo shard_infos = 4;
}
// Status of a part of a write in a particular shard
message ShardInfo {
// Unique shard index
int32 shard_index = 1;
// the status of the data for this shard
ShardStatus status = 2;
}
// the state
enum ShardStatus {
// Unspecified status, will result in an error.
SHARD_STATUS_UNSPECIFIED = 0;
// The ingester has not yet processed data in this write
SHARD_STATUS_DURABLE = 1;
// The ingester has processed the data in this write and it is
// readable (will be included in a query response)?
SHARD_STATUS_READABLE = 2;
// The ingester has processed the data in this write and it is both
// readable and completely persisted to parquet files.
SHARD_STATUS_PERSISTED = 3;
// The ingester does not have information about this shard
SHARD_STATUS_UNKNOWN = 4;
}

View File

@ -1,24 +0,0 @@
syntax = "proto3";
package influxdata.iox.write_summary.v1;
option go_package = "github.com/influxdata/iox/write_summary/v1";
// Represents a single logical write that was partitioned and sharded
// into multiple pieces in multiple shards (kafka partitions)
message WriteSummary {
// Renamed from sequencers to shards
reserved 1;
reserved "sequencers";
// per shard index (kafka partition) information
repeated ShardWrite shards = 2;
}
// Per shard (kafka partition) information about what sequence
// numbers contain part of a write
message ShardWrite {
// Unique shard index (kafka partition).
int32 shard_index = 1;
// Which sequence numbers for this shard had data
repeated int64 sequence_numbers = 2;
}

View File

@ -196,19 +196,6 @@ pub mod influxdata {
));
}
}
pub mod write_summary {
pub mod v1 {
include!(concat!(
env!("OUT_DIR"),
"/influxdata.iox.write_summary.v1.rs"
));
include!(concat!(
env!("OUT_DIR"),
"/influxdata.iox.write_summary.v1.serde.rs"
));
}
}
}
pub mod pbdata {
@ -281,8 +268,6 @@ pub mod compactor;
pub mod delete_predicate;
#[cfg(any(feature = "data_types_conversions", test))]
pub mod ingester;
#[cfg(any(feature = "data_types_conversions", test))]
pub mod write_info;
pub use prost::{DecodeError, EncodeError};

View File

@ -1,155 +0,0 @@
use crate::influxdata::iox::ingester::v1 as proto;
use data_types::ShardWriteStatus;
use std::collections::HashMap;
impl From<ShardWriteStatus> for proto::ShardStatus {
fn from(status: ShardWriteStatus) -> Self {
match status {
ShardWriteStatus::ShardUnknown => Self::Unknown,
ShardWriteStatus::Durable => Self::Durable,
ShardWriteStatus::Readable => Self::Readable,
ShardWriteStatus::Persisted => Self::Persisted,
}
}
}
impl proto::ShardStatus {
/// Convert the status to a number such that higher numbers are later in the data lifecycle.
/// For use in merging multiple write status gRPC responses into one response.
fn status_order(&self) -> u8 {
match self {
Self::Unspecified => panic!("Unspecified status"),
Self::Unknown => 0,
Self::Durable => 1,
Self::Readable => 2,
Self::Persisted => 3,
}
}
}
impl proto::ShardInfo {
fn merge(&mut self, other: &Self) {
let self_status = self.status();
let other_status = other.status();
let new_status = match self_status.status_order().cmp(&other_status.status_order()) {
std::cmp::Ordering::Less => other_status,
std::cmp::Ordering::Equal => self_status,
std::cmp::Ordering::Greater => self_status,
};
self.set_status(new_status);
}
}
/// "Merges" the partition information for write info responses so that the "most recent"
/// information is returned.
pub fn merge_responses(
responses: impl IntoIterator<Item = proto::GetWriteInfoResponse>,
) -> proto::GetWriteInfoResponse {
// Map shard index to status
let mut shard_infos: HashMap<_, proto::ShardInfo> = HashMap::new();
responses
.into_iter()
.flat_map(|res| res.shard_infos.into_iter())
.for_each(|info| {
shard_infos
.entry(info.shard_index)
.and_modify(|existing_info| existing_info.merge(&info))
.or_insert(info);
});
let shard_infos = shard_infos.into_values().collect();
proto::GetWriteInfoResponse { shard_infos }
}
#[cfg(test)]
mod tests {
use super::*;
use proto::{ShardInfo, ShardStatus};
#[test]
fn test_merge() {
#[derive(Debug)]
struct Test<'a> {
left: &'a ShardInfo,
right: &'a ShardInfo,
expected: &'a ShardInfo,
}
let durable = ShardInfo {
shard_index: 1,
status: ShardStatus::Durable.into(),
};
let readable = ShardInfo {
shard_index: 1,
status: ShardStatus::Readable.into(),
};
let persisted = ShardInfo {
shard_index: 1,
status: ShardStatus::Persisted.into(),
};
let unknown = ShardInfo {
shard_index: 1,
status: ShardStatus::Unknown.into(),
};
let tests = vec![
Test {
left: &unknown,
right: &unknown,
expected: &unknown,
},
Test {
left: &unknown,
right: &durable,
expected: &durable,
},
Test {
left: &unknown,
right: &readable,
expected: &readable,
},
Test {
left: &durable,
right: &unknown,
expected: &durable,
},
Test {
left: &readable,
right: &readable,
expected: &readable,
},
Test {
left: &durable,
right: &durable,
expected: &durable,
},
Test {
left: &readable,
right: &durable,
expected: &readable,
},
Test {
left: &persisted,
right: &durable,
expected: &persisted,
},
];
for test in tests {
let mut output = test.left.clone();
output.merge(test.right);
assert_eq!(
&output, test.expected,
"Mismatch\n\nOutput:\n{output:#?}\n\nTest:\n{test:#?}"
);
}
}
}

View File

@ -36,8 +36,5 @@ pub mod store;
/// Client for testing purposes.
pub mod test;
/// Client for fetching write info
pub mod write_info;
/// Client for write API
pub mod write;

View File

@ -1,52 +0,0 @@
use client_util::connection::GrpcConnection;
use self::generated_types::{write_info_service_client::WriteInfoServiceClient, *};
use crate::connection::Connection;
use crate::error::Error;
/// Re-export generated_types
pub mod generated_types {
pub use generated_types::influxdata::iox::ingester::v1::{
write_info_service_client, write_info_service_server, GetWriteInfoRequest,
GetWriteInfoResponse, ShardInfo, ShardStatus,
};
pub use generated_types::write_info::merge_responses;
}
/// A basic client for fetching information about write tokens from a
/// single ingester.
///
/// NOTE: This is an ALPHA / Internal API that is used as part of the
/// end to end tests.
///
/// A public API is tracked here:
/// <https://github.com/influxdata/influxdb_iox/issues/4354>
#[derive(Debug, Clone)]
pub struct Client {
inner: WriteInfoServiceClient<GrpcConnection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(connection: Connection) -> Self {
Self {
inner: WriteInfoServiceClient::new(connection.into_grpc_connection()),
}
}
/// Get the write information for a write token
pub async fn get_write_info(
&mut self,
write_token: &str,
) -> Result<GetWriteInfoResponse, Error> {
let response = self
.inner
.get_write_info(GetWriteInfoRequest {
write_token: write_token.to_string(),
})
.await?;
Ok(response.into_inner())
}
}

View File

@ -98,10 +98,6 @@ impl<C: QuerierHandler + std::fmt::Debug + 'static> ServerType for QuerierServer
builder,
rpc::namespace::namespace_service(Arc::clone(&self.database))
);
add_service!(
builder,
rpc::write_info::write_info_service(Arc::clone(&self.database))
);
add_service!(builder, self.server.handler().schema_service());
add_service!(builder, self.server.handler().catalog_service());
add_service!(builder, self.server.handler().object_store_service());

View File

@ -1,3 +1,2 @@
pub(crate) mod namespace;
pub(crate) mod query;
pub(crate) mod write_info;

View File

@ -1,48 +0,0 @@
//! WriteInfoService gRPC implementation
use generated_types::influxdata::iox::ingester::v1::{
self as proto,
write_info_service_server::{WriteInfoService, WriteInfoServiceServer},
};
use querier::QuerierDatabase;
use std::sync::Arc;
/// Acquire a [`WriteInfoService`] gRPC service implementation.
pub fn write_info_service(
server: Arc<QuerierDatabase>,
) -> WriteInfoServiceServer<impl WriteInfoService> {
WriteInfoServiceServer::new(QuerierWriteInfoServiceImpl::new(server))
}
#[derive(Debug)]
struct QuerierWriteInfoServiceImpl {
server: Arc<QuerierDatabase>,
}
impl QuerierWriteInfoServiceImpl {
pub fn new(server: Arc<QuerierDatabase>) -> Self {
Self { server }
}
}
#[tonic::async_trait]
impl WriteInfoService for QuerierWriteInfoServiceImpl {
async fn get_write_info(
&self,
request: tonic::Request<proto::GetWriteInfoRequest>,
) -> Result<tonic::Response<proto::GetWriteInfoResponse>, tonic::Status> {
let proto::GetWriteInfoRequest { write_token } = request.into_inner();
let ingester_connection = self
.server
.ingester_connection()
.expect("Ingester connections must be configured to get write info");
let progresses = ingester_connection
.get_write_info(&write_token)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;
Ok(tonic::Response::new(progresses))
}
}

View File

@ -24,4 +24,3 @@ tokio = { version = "1.27", features = ["macros", "net", "parking_lot", "rt-mult
tokio-util = { version = "0.7.7" }
trace = { path = "../trace" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
write_summary = { path = "../write_summary" }

View File

@ -27,7 +27,6 @@ use router::{
dml_handlers::{
lazy_connector::LazyConnector, DmlHandler, DmlHandlerChainExt, FanOutAdaptor,
InstrumentationDecorator, Partitioner, RetentionValidator, RpcWrite, SchemaValidator,
WriteSummaryAdapter,
},
namespace_cache::{
metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache, ReadThroughCache,
@ -55,7 +54,6 @@ use std::{
use thiserror::Error;
use tokio_util::sync::CancellationToken;
use trace::TraceCollector;
use write_summary::WriteSummary;
#[derive(Debug, Error)]
pub enum Error {
@ -96,7 +94,7 @@ impl<D, N> std::fmt::Debug for RpcWriteRouterServerType<D, N> {
#[async_trait]
impl<D, N> ServerType for RpcWriteRouterServerType<D, N>
where
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = WriteSummary> + 'static,
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = ()> + 'static,
N: NamespaceResolver + 'static,
{
/// Return the [`metric::Registry`] used by the router.
@ -318,7 +316,7 @@ pub async fn create_router2_server_type(
////////////////////////////////////////////////////////////////////////////
// # Parallel writer
let parallel_write = WriteSummaryAdapter::new(FanOutAdaptor::new(rpc_writer));
let parallel_write = FanOutAdaptor::new(rpc_writer);
// # Handler stack
//

View File

@ -18,11 +18,7 @@ use data_types::{
};
use datafusion::error::DataFusionError;
use futures::{stream::FuturesUnordered, TryStreamExt};
use generated_types::{
influxdata::iox::ingester::v1::GetWriteInfoResponse,
ingester::{encode_proto_predicate_as_base64, IngesterQueryRequest},
write_info::merge_responses,
};
use generated_types::ingester::{encode_proto_predicate_as_base64, IngesterQueryRequest};
use influxdb_iox_client::flight::generated_types::IngesterQueryResponseMetadata;
use iox_query::{
exec::{stringset::StringSet, IOxSessionContext},
@ -223,10 +219,6 @@ pub trait IngesterConnection: std::fmt::Debug + Send + Sync + 'static {
span: Option<Span>,
) -> Result<Vec<IngesterPartition>>;
/// Returns the most recent partition status info across all ingester(s) for the specified
/// write token.
async fn get_write_info(&self, write_token: &str) -> Result<GetWriteInfoResponse>;
/// Return backend as [`Any`] which can be used to downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
}
@ -990,41 +982,11 @@ impl IngesterConnection for IngesterConnectionImpl {
Ok(ingester_partitions)
}
async fn get_write_info(&self, write_token: &str) -> Result<GetWriteInfoResponse> {
let responses = self
.unique_ingester_addresses
.iter()
.map(|ingester_address| execute_get_write_infos(ingester_address, write_token))
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>()
.await?;
Ok(merge_responses(responses))
}
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
}
async fn execute_get_write_infos(
ingester_address: &str,
write_token: &str,
) -> Result<GetWriteInfoResponse, Error> {
let connection = connection::Builder::new()
.build(ingester_address)
.await
.context(ConnectingSnafu { ingester_address })?;
influxdb_iox_client::write_info::Client::new(connection)
.get_write_info(write_token)
.await
.context(WriteInfoSnafu {
ingester_address,
write_token,
})
}
/// A wrapper around the unpersisted data in a partition returned by
/// the ingester that (will) implement the `QueryChunk` interface
///

View File

@ -4,7 +4,6 @@ use super::IngesterConnection;
use async_trait::async_trait;
use data_types::NamespaceId;
use data_types::ShardIndex;
use generated_types::influxdata::iox::ingester::v1::GetWriteInfoResponse;
use iox_query::util::create_basic_summary;
use parking_lot::Mutex;
use schema::Projection;
@ -116,10 +115,6 @@ impl IngesterConnection for MockIngesterConnection {
Ok(partitions)
}
async fn get_write_info(&self, _write_token: &str) -> super::Result<GetWriteInfoResponse> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}

View File

@ -46,7 +46,6 @@ tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] }
tonic = "0.8"
trace = { path = "../trace/" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
write_summary = { path = "../write_summary" }
[dev-dependencies]
assert_matches = "1.5"

View File

@ -40,7 +40,7 @@ where
U: Iterator<Item = T::WriteInput> + Send + Sync,
{
type WriteInput = I;
type WriteOutput = Vec<T::WriteOutput>;
type WriteOutput = ();
type WriteError = T::WriteError;
type DeleteError = T::DeleteError;
@ -54,7 +54,7 @@ where
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
let results = input
input
.into_iter()
.map(|v| {
let namespace = namespace.clone();
@ -68,7 +68,7 @@ where
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>()
.await?;
Ok(results)
Ok(())
}
/// Pass the delete through to the inner handler.

View File

@ -147,7 +147,6 @@ mod tests {
use data_types::TimestampRange;
use metric::Attributes;
use trace::{span::SpanStatus, RingBufferTraceCollector, TraceCollector};
use write_summary::WriteSummary;
use super::*;
use crate::dml_handlers::{mock::MockDmlHandler, DmlError};
@ -191,14 +190,10 @@ mod tests {
);
}
fn summary() -> WriteSummary {
WriteSummary::default()
}
#[tokio::test]
async fn test_write_ok() {
let ns = "platanos".try_into().unwrap();
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())]));
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())]));
let metrics = Arc::new(metric::Registry::default());
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));

View File

@ -4,7 +4,6 @@ use async_trait::async_trait;
use data_types::{DeletePredicate, NamespaceId, NamespaceName};
use parking_lot::Mutex;
use trace::ctx::SpanContext;
use write_summary::WriteSummary;
use super::{DmlError, DmlHandler};
@ -28,7 +27,7 @@ pub enum MockDmlHandlerCall<W> {
#[derive(Debug)]
struct Inner<W> {
calls: Vec<MockDmlHandlerCall<W>>,
write_return: VecDeque<Result<WriteSummary, DmlError>>,
write_return: VecDeque<Result<(), DmlError>>,
delete_return: VecDeque<Result<(), DmlError>>,
}
@ -61,10 +60,7 @@ impl<W> MockDmlHandler<W>
where
W: Clone,
{
pub fn with_write_return(
self,
ret: impl Into<VecDeque<Result<WriteSummary, DmlError>>>,
) -> Self {
pub fn with_write_return(self, ret: impl Into<VecDeque<Result<(), DmlError>>>) -> Self {
self.0.lock().write_return = ret.into();
self
}
@ -99,7 +95,7 @@ where
type WriteError = DmlError;
type DeleteError = DmlError;
type WriteInput = W;
type WriteOutput = WriteSummary;
type WriteOutput = ();
async fn write(
&self,

View File

@ -68,8 +68,5 @@ pub use fan_out::*;
mod rpc_write;
pub use rpc_write::*;
mod write_summary;
pub use self::write_summary::*;
#[cfg(test)]
pub mod mock;

View File

@ -1,67 +0,0 @@
use std::fmt::Debug;
use async_trait::async_trait;
use data_types::{DeletePredicate, NamespaceId, NamespaceName};
use dml::DmlMeta;
use trace::ctx::SpanContext;
use write_summary::WriteSummary;
use super::DmlHandler;
/// A [`WriteSummaryAdapter`] wraps DML Handler that produces
/// `Vec<Vec<DmlMeta>>` for each write, and produces a WriteSummary,
/// suitable for
/// sending back to a client
#[derive(Debug, Default)]
pub struct WriteSummaryAdapter<T> {
inner: T,
}
impl<T> WriteSummaryAdapter<T> {
/// Construct a [`WriteSummaryAdapter`] that passes DML operations to `inner`
/// concurrently.
pub fn new(inner: T) -> Self {
Self { inner }
}
}
#[async_trait]
impl<T> DmlHandler for WriteSummaryAdapter<T>
where
T: DmlHandler<WriteOutput = Vec<Vec<DmlMeta>>>,
{
type WriteInput = T::WriteInput;
type WriteOutput = WriteSummary;
type WriteError = T::WriteError;
type DeleteError = T::DeleteError;
/// Sends `input` to the inner handler, which returns a
/// `Vec<Vec<DmlMeta>>`, creating a `WriteSummary`
async fn write(
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
let metas = self
.inner
.write(namespace, namespace_id, input, span_ctx)
.await?;
Ok(WriteSummary::new(metas))
}
/// Pass the delete through to the inner handler.
async fn delete(
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
table_name: &str,
predicate: &DeletePredicate,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError> {
self.inner
.delete(namespace, namespace_id, table_name, predicate, span_ctx)
.await
}
}

View File

@ -19,7 +19,6 @@ use std::{str::Utf8Error, sync::Arc, time::Instant};
use thiserror::Error;
use tokio::sync::{Semaphore, TryAcquireError};
use trace::ctx::SpanContext;
use write_summary::WriteSummary;
use self::{
delete_predicate::parse_http_delete_request,
@ -35,8 +34,6 @@ use crate::{
namespace_resolver::NamespaceResolver,
};
const WRITE_TOKEN_HTTP_HEADER: &str = "X-IOx-Write-Token";
/// Errors returned by the `router` HTTP request handler.
#[derive(Debug, Error)]
pub enum Error {
@ -312,7 +309,7 @@ impl<D, N> HttpDelegate<D, N, SystemProvider> {
impl<D, N, T> HttpDelegate<D, N, T>
where
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = WriteSummary>,
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = ()>,
N: NamespaceResolver,
T: TimeProvider,
{
@ -349,10 +346,9 @@ where
(&Method::POST, "/api/v2/delete") => self.delete_handler(req).await,
_ => return Err(Error::NoHandler),
}
.map(|summary| {
.map(|_summary| {
Response::builder()
.status(StatusCode::NO_CONTENT)
.header(WRITE_TOKEN_HTTP_HEADER, summary.to_token())
.body(Body::empty())
.unwrap()
})
@ -362,7 +358,7 @@ where
&self,
req: Request<Body>,
write_info: WriteParams,
) -> Result<WriteSummary, Error> {
) -> Result<(), Error> {
let span_ctx: Option<SpanContext> = req.extensions().get().cloned();
let token = req
@ -406,7 +402,7 @@ where
Ok(v) => v,
Err(mutable_batch_lp::Error::EmptyPayload) => {
debug!("nothing to write");
return Ok(WriteSummary::default());
return Ok(());
}
Err(e) => return Err(Error::ParseLineProtocol(e)),
};
@ -431,8 +427,7 @@ where
.get_namespace_id(&write_info.namespace)
.await?;
let summary = self
.dml_handler
self.dml_handler
.write(&write_info.namespace, namespace_id, batches, span_ctx)
.await
.map_err(Into::into)?;
@ -442,10 +437,10 @@ where
self.write_metric_tables.inc(num_tables as _);
self.write_metric_body_size.inc(body.len() as _);
Ok(summary)
Ok(())
}
async fn delete_handler(&self, req: Request<Body>) -> Result<WriteSummary, Error> {
async fn delete_handler(&self, req: Request<Body>) -> Result<(), Error> {
let span_ctx: Option<SpanContext> = req.extensions().get().cloned();
let write_info = self.write_param_extractor.parse_v2(&req)?;
@ -491,9 +486,7 @@ where
self.delete_metric_body_size.inc(body.len() as _);
// TODO pass back write summaries for deletes as well
// https://github.com/influxdata/influxdb_iox/issues/4209
Ok(WriteSummary::default())
Ok(())
}
/// Parse the request's body into raw bytes, applying the configured size
@ -589,10 +582,6 @@ mod tests {
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
static NAMESPACE_NAME: &str = "bananas_test";
fn summary() -> WriteSummary {
WriteSummary::default()
}
fn assert_metric_hit(metrics: &metric::Registry, name: &'static str, value: Option<u64>) {
let counter = metrics
.get_instrument::<Metric<U64Counter>>(name)
@ -793,7 +782,7 @@ mod tests {
ok,
query_string = "?org=bananas&bucket=test",
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Ok(summary())],
dml_handler = [Ok(())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, ..}] => {
assert_eq!(namespace, NAMESPACE_NAME);
@ -804,7 +793,7 @@ mod tests {
ok_precision_s,
query_string = "?org=bananas&bucket=test&precision=s",
body = "platanos,tag1=A,tag2=B val=42i 1647622847".as_bytes(),
dml_handler = [Ok(summary())],
dml_handler = [Ok(())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => {
assert_eq!(namespace, NAMESPACE_NAME);
@ -820,7 +809,7 @@ mod tests {
ok_precision_ms,
query_string = "?org=bananas&bucket=test&precision=ms",
body = "platanos,tag1=A,tag2=B val=42i 1647622847000".as_bytes(),
dml_handler = [Ok(summary())],
dml_handler = [Ok(())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => {
assert_eq!(namespace, NAMESPACE_NAME);
@ -836,7 +825,7 @@ mod tests {
ok_precision_us,
query_string = "?org=bananas&bucket=test&precision=us",
body = "platanos,tag1=A,tag2=B val=42i 1647622847000000".as_bytes(),
dml_handler = [Ok(summary())],
dml_handler = [Ok(())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => {
assert_eq!(namespace, NAMESPACE_NAME);
@ -852,7 +841,7 @@ mod tests {
ok_precision_ns,
query_string = "?org=bananas&bucket=test&precision=ns",
body = "platanos,tag1=A,tag2=B val=42i 1647622847000000000".as_bytes(),
dml_handler = [Ok(summary())],
dml_handler = [Ok(())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => {
assert_eq!(namespace, NAMESPACE_NAME);
@ -869,7 +858,7 @@ mod tests {
// SECONDS, so multiplies the provided timestamp by 1,000,000,000
query_string = "?org=bananas&bucket=test&precision=s",
body = "platanos,tag1=A,tag2=B val=42i 1647622847000000000".as_bytes(),
dml_handler = [Ok(summary())],
dml_handler = [Ok(())],
want_result = Err(Error::ParseLineProtocol(_)),
want_dml_calls = []
);
@ -878,7 +867,7 @@ mod tests {
no_query_params,
query_string = "",
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Ok(summary())],
dml_handler = [Ok(())],
want_result = Err(Error::MultiTenantError(
MultiTenantExtractError::ParseV2Request(V2WriteParseError::NoQueryParams)
)),
@ -889,7 +878,7 @@ mod tests {
no_org_bucket,
query_string = "?",
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Ok(summary())],
dml_handler = [Ok(())],
want_result = Err(Error::MultiTenantError(
MultiTenantExtractError::InvalidOrgAndBucket(
OrgBucketMappingError::NoOrgBucketSpecified
@ -902,7 +891,7 @@ mod tests {
empty_org_bucket,
query_string = "?org=&bucket=",
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Ok(summary())],
dml_handler = [Ok(())],
want_result = Err(Error::MultiTenantError(
MultiTenantExtractError::InvalidOrgAndBucket(
OrgBucketMappingError::NoOrgBucketSpecified
@ -915,7 +904,7 @@ mod tests {
invalid_org_bucket,
query_string = format!("?org=test&bucket={}", "A".repeat(1000)),
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Ok(summary())],
dml_handler = [Ok(())],
want_result = Err(Error::MultiTenantError(
MultiTenantExtractError::InvalidOrgAndBucket(
OrgBucketMappingError::InvalidNamespaceName(
@ -930,7 +919,7 @@ mod tests {
invalid_line_protocol,
query_string = "?org=bananas&bucket=test",
body = "not line protocol".as_bytes(),
dml_handler = [Ok(summary())],
dml_handler = [Ok(())],
want_result = Err(Error::ParseLineProtocol(_)),
want_dml_calls = [] // None
);
@ -939,7 +928,7 @@ mod tests {
non_utf8_body,
query_string = "?org=bananas&bucket=test",
body = vec![0xc3, 0x28],
dml_handler = [Ok(summary())],
dml_handler = [Ok(())],
want_result = Err(Error::NonUtf8Body(_)),
want_dml_calls = [] // None
);
@ -967,7 +956,7 @@ mod tests {
.flat_map(|s| s.bytes())
.collect::<Vec<u8>>()
},
dml_handler = [Ok(summary())],
dml_handler = [Ok(())],
want_result = Err(Error::RequestSizeExceeded(_)),
want_dml_calls = [] // None
);
@ -998,7 +987,7 @@ mod tests {
field_upsert_within_batch,
query_string = "?org=bananas&bucket=test",
body = "test field=1u 100\ntest field=2u 100".as_bytes(),
dml_handler = [Ok(summary())],
dml_handler = [Ok(())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, namespace_id, write_input}] => {
assert_eq!(namespace, NAMESPACE_NAME);
@ -1151,7 +1140,7 @@ mod tests {
duplicate_fields_same_value,
query_string = "?org=bananas&bucket=test",
body = "whydo InputPower=300i,InputPower=300i".as_bytes(),
dml_handler = [Ok(summary())],
dml_handler = [Ok(())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input, ..}] => {
assert_eq!(namespace, NAMESPACE_NAME);
@ -1168,7 +1157,7 @@ mod tests {
duplicate_fields_different_value,
query_string = "?org=bananas&bucket=test",
body = "whydo InputPower=300i,InputPower=42i".as_bytes(),
dml_handler = [Ok(summary())],
dml_handler = [Ok(())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input, ..}] => {
assert_eq!(namespace, NAMESPACE_NAME);
@ -1423,7 +1412,7 @@ mod tests {
let dml_handler = Arc::new(
MockDmlHandler::default()
.with_write_return([Ok(summary())])
.with_write_return([Ok(())])
.with_delete_return([]),
);
let metrics = Arc::new(metric::Registry::default());
@ -1505,7 +1494,7 @@ mod tests {
let dml_handler = Arc::new(
MockDmlHandler::default()
.with_write_return([Ok(summary())])
.with_write_return([Ok(())])
.with_delete_return([]),
);
let metrics = Arc::new(metric::Registry::default());
@ -1554,7 +1543,7 @@ mod tests {
let dml_handler = Arc::new(
MockDmlHandler::default()
.with_write_return([Ok(summary()), Ok(summary()), Ok(summary())])
.with_write_return([Ok(()), Ok(()), Ok(())])
.with_delete_return([]),
);
let metrics = Arc::new(metric::Registry::default());

View File

@ -12,7 +12,7 @@ use router::{
dml_handlers::{
client::mock::MockWriteClient, Chain, DmlHandlerChainExt, FanOutAdaptor,
InstrumentationDecorator, Partitioned, Partitioner, RetentionValidator, RpcWrite,
SchemaValidator, WriteSummaryAdapter,
SchemaValidator,
},
namespace_cache::{MemoryNamespaceCache, ReadThroughCache, ShardedCache},
namespace_resolver::{MissingNamespaceAction, NamespaceAutocreation, NamespaceSchemaResolver},
@ -127,11 +127,9 @@ type HttpDelegateStack = HttpDelegate<
>,
Partitioner,
>,
WriteSummaryAdapter<
FanOutAdaptor<
RpcWrite<Arc<MockWriteClient>>,
Vec<Partitioned<HashMap<TableId, (String, MutableBatch)>>>,
>,
FanOutAdaptor<
RpcWrite<Arc<MockWriteClient>>,
Vec<Partitioned<HashMap<TableId, (String, MutableBatch)>>>,
>,
>,
>,
@ -180,7 +178,7 @@ impl TestContext {
namespace_autocreation,
);
let parallel_write = WriteSummaryAdapter::new(FanOutAdaptor::new(rpc_writer));
let parallel_write = FanOutAdaptor::new(rpc_writer);
let handler_stack = retention_validator
.and_then(schema_validator)

View File

@ -97,7 +97,6 @@ impl From<Id> for String {
#[cfg(test)]
mod tests {
use super::*;
use serde::Deserialize;
use std::convert::TryInto;
#[test]
@ -169,23 +168,4 @@ mod tests {
assert_eq!(expected_output, actual_output);
}
}
#[test]
fn test_deserialize_then_to_string() {
let i: Id = "0000111100001111".parse().unwrap();
assert_eq!(Id(NonZeroU64::new(18_764_712_120_593).unwrap()), i);
#[derive(Deserialize)]
struct WriteInfo {
org: Id,
}
let query = "org=0000111100001111";
let write_info: WriteInfo = serde_urlencoded::from_str(query).unwrap();
assert_eq!(
Id(NonZeroU64::new(18_764_712_120_593).unwrap()),
write_info.org
);
assert_eq!("0000111100001111", write_info.org.to_string());
}
}

View File

@ -1,19 +0,0 @@
[package]
name = "write_summary"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
base64 = "0.21"
data_types = { path = "../data_types" }
dml = { path = "../dml" }
generated_types = { path = "../generated_types" }
observability_deps = { path = "../observability_deps" }
serde_json = "1.0.95"
snafu = "0.7"
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]
iox_time = { path = "../iox_time" }

View File

@ -1,445 +0,0 @@
use base64::{prelude::BASE64_STANDARD, Engine};
use data_types::{SequenceNumber, ShardIndex, ShardWriteStatus};
use dml::DmlMeta;
/// Protobuf to/from conversion
use generated_types::influxdata::iox::write_summary::v1 as proto;
use observability_deps::tracing::debug;
use snafu::{OptionExt, Snafu};
use std::collections::BTreeMap;
mod progress;
pub use progress::ShardProgress;
#[derive(Debug, Snafu, PartialEq, Eq)]
pub enum Error {
#[snafu(display("Unknown shard index: {}", shard_index))]
UnknownShard { shard_index: ShardIndex },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Contains information about a single write.
///
/// A single write consisting of multiple lines of line protocol
/// formatted data are shared and partitioned across potentially
/// several shards which are then processed by the ingester to
/// become readable at potentially different times.
///
/// This struct contains sufficient information to determine the
/// current state of the write as a whole
#[derive(Debug, Default, Clone, PartialEq, Eq)]
/// Summary of a `Vec<Vec<DmlMeta>>`
pub struct WriteSummary {
/// Key is the shard index from the `DmlMeta` structure (aka kafka
/// partition id), value is the sequence numbers from that
/// shard.
///
/// Note: `BTreeMap` to ensure the output is in a consistent order
shards: BTreeMap<ShardIndex, Vec<SequenceNumber>>,
}
impl WriteSummary {
pub fn new(metas: Vec<Vec<DmlMeta>>) -> Self {
debug!(?metas, "Creating write summary");
let sequences = metas
.iter()
.flat_map(|v| v.iter())
.filter_map(|meta| meta.sequence());
let mut shards = BTreeMap::new();
for s in sequences {
let shard_index = s.shard_index;
let sequence_number = s.sequence_number;
shards
.entry(shard_index)
.or_insert_with(Vec::new)
.push(sequence_number)
}
Self { shards }
}
/// Return an opaque summary "token" of this summary
pub fn to_token(self) -> String {
let proto_write_summary: proto::WriteSummary = self.into();
BASE64_STANDARD.encode(
serde_json::to_string(&proto_write_summary)
.expect("unexpected error serializing token to json"),
)
}
/// Return a WriteSummary from the "token" (created with [Self::to_token]), or error if not possible
pub fn try_from_token(token: &str) -> Result<Self, String> {
let data = BASE64_STANDARD
.decode(token)
.map_err(|e| format!("Invalid write token, invalid base64: {e}"))?;
let json = String::from_utf8(data)
.map_err(|e| format!("Invalid write token, non utf8 data in write token: {e}"))?;
let proto = serde_json::from_str::<proto::WriteSummary>(&json)
.map_err(|e| format!("Invalid write token, protobuf decode error: {e}"))?;
proto
.try_into()
.map_err(|e| format!("Invalid write token, invalid content: {e}"))
}
/// return what shard indexes from the write buffer were present in this write summary
pub fn shard_indexes(&self) -> Vec<ShardIndex> {
self.shards.keys().cloned().collect()
}
/// Given the write described by this summary, and the shard's progress for a particular
/// shard index, returns the status of that write in this write summary
pub fn write_status(
&self,
shard_index: ShardIndex,
progress: &ShardProgress,
) -> Result<ShardWriteStatus> {
let sequence_numbers = self
.shards
.get(&shard_index)
.context(UnknownShardSnafu { shard_index })?;
debug!(?shard_index, ?progress, ?sequence_numbers, "write_status");
if progress.is_empty() {
return Ok(ShardWriteStatus::ShardUnknown);
}
let is_persisted = sequence_numbers
.iter()
.all(|sequence_number| progress.persisted(*sequence_number));
if is_persisted {
return Ok(ShardWriteStatus::Persisted);
}
let is_readable = sequence_numbers
.iter()
.all(|sequence_number| progress.readable(*sequence_number));
if is_readable {
return Ok(ShardWriteStatus::Readable);
}
Ok(ShardWriteStatus::Durable)
}
}
impl From<WriteSummary> for proto::WriteSummary {
fn from(summary: WriteSummary) -> Self {
let shards = summary
.shards
.into_iter()
.map(|(shard_index, sequence_numbers)| proto::ShardWrite {
shard_index: shard_index.get(),
sequence_numbers: sequence_numbers.into_iter().map(|v| v.get()).collect(),
})
.collect();
Self { shards }
}
}
impl TryFrom<proto::WriteSummary> for WriteSummary {
type Error = String;
fn try_from(summary: proto::WriteSummary) -> Result<Self, Self::Error> {
let shards = summary
.shards
.into_iter()
.map(
|proto::ShardWrite {
shard_index,
sequence_numbers,
}| {
let sequence_numbers = sequence_numbers
.into_iter()
.map(SequenceNumber::new)
.collect::<Vec<_>>();
Ok((ShardIndex::new(shard_index), sequence_numbers))
},
)
.collect::<Result<BTreeMap<_, _>, String>>()?;
Ok(Self { shards })
}
}
#[cfg(test)]
mod tests {
use super::*;
use data_types::Sequence;
#[test]
fn empty() {
let metas = vec![];
let summary: proto::WriteSummary = WriteSummary::new(metas).into();
let expected = proto::WriteSummary { shards: vec![] };
assert_eq!(summary, expected);
}
#[test]
fn one() {
let metas = vec![vec![make_meta(Sequence::new(
ShardIndex::new(1),
SequenceNumber::new(2),
))]];
let summary: proto::WriteSummary = WriteSummary::new(metas).into();
let expected = proto::WriteSummary {
shards: vec![proto::ShardWrite {
shard_index: 1,
sequence_numbers: vec![2],
}],
};
assert_eq!(summary, expected);
}
#[test]
fn many() {
let metas = vec![
vec![
make_meta(Sequence::new(ShardIndex::new(1), SequenceNumber::new(2))),
make_meta(Sequence::new(ShardIndex::new(10), SequenceNumber::new(20))),
],
vec![make_meta(Sequence::new(
ShardIndex::new(1),
SequenceNumber::new(3),
))],
];
let summary: proto::WriteSummary = WriteSummary::new(metas).into();
let expected = proto::WriteSummary {
shards: vec![
proto::ShardWrite {
shard_index: 1,
sequence_numbers: vec![2, 3],
},
proto::ShardWrite {
shard_index: 10,
sequence_numbers: vec![20],
},
],
};
assert_eq!(summary, expected);
}
#[test]
fn different_order() {
// order in sequences shouldn't matter
let metas1 = vec![vec![
make_meta(Sequence::new(ShardIndex::new(1), SequenceNumber::new(2))),
make_meta(Sequence::new(ShardIndex::new(2), SequenceNumber::new(3))),
]];
// order in sequences shouldn't matter
let metas2 = vec![vec![
make_meta(Sequence::new(ShardIndex::new(2), SequenceNumber::new(3))),
make_meta(Sequence::new(ShardIndex::new(1), SequenceNumber::new(2))),
]];
let summary1: proto::WriteSummary = WriteSummary::new(metas1).into();
let summary2: proto::WriteSummary = WriteSummary::new(metas2).into();
let expected = proto::WriteSummary {
shards: vec![
proto::ShardWrite {
shard_index: 1,
sequence_numbers: vec![2],
},
proto::ShardWrite {
shard_index: 2,
sequence_numbers: vec![3],
},
],
};
assert_eq!(summary1, expected);
assert_eq!(summary2, expected);
}
#[test]
fn token_creation() {
let metas = vec![vec![make_meta(Sequence::new(
ShardIndex::new(1),
SequenceNumber::new(2),
))]];
let summary = WriteSummary::new(metas.clone());
let summary_copy = WriteSummary::new(metas);
let metas2 = vec![vec![make_meta(Sequence::new(
ShardIndex::new(2),
SequenceNumber::new(3),
))]];
let summary2 = WriteSummary::new(metas2);
let token = summary.to_token();
// non empty
assert!(!token.is_empty());
// same when created with same metas
assert_eq!(token, summary_copy.to_token());
// different when created with different metas
assert_ne!(token, summary2.to_token());
assert!(
!token.contains("sequenceNumbers"),
"token not obscured: {token}"
);
assert!(!token.contains("shards"), "token not obscured: {token}");
}
#[test]
fn token_parsing() {
let metas = vec![vec![make_meta(Sequence::new(
ShardIndex::new(1),
SequenceNumber::new(2),
))]];
let summary = WriteSummary::new(metas);
let token = summary.clone().to_token();
// round trip should parse to the same summary
let new_summary = WriteSummary::try_from_token(&token).expect("parsing successful");
assert_eq!(summary, new_summary);
}
#[test]
#[should_panic(expected = "Invalid write token, invalid base64")]
fn token_parsing_bad_base64() {
let token = "foo%%";
WriteSummary::try_from_token(token).unwrap();
}
#[test]
#[should_panic(expected = "Invalid write token, non utf8 data in write token")]
fn token_parsing_bad_utf8() {
let token = BASE64_STANDARD.encode(vec![0xa0, 0xa1]);
WriteSummary::try_from_token(&token).unwrap();
}
#[test]
#[should_panic(expected = "Invalid write token, protobuf decode error: key must be a string")]
fn token_parsing_bad_proto() {
let token = BASE64_STANDARD.encode("{not_valid_json}");
WriteSummary::try_from_token(&token).unwrap();
}
#[test]
fn no_progress() {
let summary = test_summary();
// if we have no info about this shard in the progress
let shard_index = ShardIndex::new(1);
let progress = ShardProgress::new();
assert_eq!(
summary.write_status(shard_index, &progress),
Ok(ShardWriteStatus::ShardUnknown)
);
}
#[test]
fn unknown_shard() {
let summary = test_summary();
// No information on shard index 3
let shard_index = ShardIndex::new(3);
let progress = ShardProgress::new().with_buffered(SequenceNumber::new(2));
let err = summary.write_status(shard_index, &progress).unwrap_err();
assert_eq!(err.to_string(), "Unknown shard index: 3");
}
#[test]
fn readable() {
let summary = test_summary();
// shard index 1 made it to sequence number 3
let shard_index = ShardIndex::new(1);
let progress = ShardProgress::new().with_buffered(SequenceNumber::new(3));
assert_eq!(
summary.write_status(shard_index, &progress),
Ok(ShardWriteStatus::Readable)
);
// if shard index 1 only made it to sequence number 2, but write includes sequence number 3
let shard_index = ShardIndex::new(1);
let progress = ShardProgress::new().with_buffered(SequenceNumber::new(2));
assert_eq!(
summary.write_status(shard_index, &progress),
Ok(ShardWriteStatus::Durable)
);
// shard index 2 made it to sequence number 2
let shard_index = ShardIndex::new(2);
let progress = ShardProgress::new().with_buffered(SequenceNumber::new(2));
assert_eq!(
summary.write_status(shard_index, &progress),
Ok(ShardWriteStatus::Readable)
);
}
#[test]
fn persisted() {
let summary = test_summary();
// shard index 1 has persisted up to sequence number 3
let shard_index = ShardIndex::new(1);
let progress = ShardProgress::new().with_persisted(SequenceNumber::new(3));
assert_eq!(
summary.write_status(shard_index, &progress),
Ok(ShardWriteStatus::Persisted)
);
// shard index 2 has persisted up to sequence number 2
let shard_index = ShardIndex::new(2);
let progress = ShardProgress::new().with_persisted(SequenceNumber::new(2));
assert_eq!(
summary.write_status(shard_index, &progress),
Ok(ShardWriteStatus::Persisted)
);
// shard index 1 only persisted up to sequence number 2, have buffered data at sequence
// number 3
let shard_index = ShardIndex::new(1);
let progress = ShardProgress::new()
.with_buffered(SequenceNumber::new(3))
.with_persisted(SequenceNumber::new(2));
assert_eq!(
summary.write_status(shard_index, &progress),
Ok(ShardWriteStatus::Readable)
);
}
/// Return a write summary that describes a write with:
/// shard 1 --> sequence 3
/// shard 2 --> sequence 1
fn test_summary() -> WriteSummary {
let metas = vec![vec![
make_meta(Sequence::new(ShardIndex::new(1), SequenceNumber::new(2))),
make_meta(Sequence::new(ShardIndex::new(1), SequenceNumber::new(3))),
make_meta(Sequence::new(ShardIndex::new(2), SequenceNumber::new(1))),
]];
WriteSummary::new(metas)
}
fn make_meta(s: Sequence) -> DmlMeta {
use iox_time::TimeProvider;
let time_provider = iox_time::SystemProvider::new();
let span_context = None;
let bytes_read = 132;
DmlMeta::sequenced(s, time_provider.now(), span_context, bytes_read)
}
}

View File

@ -1,399 +0,0 @@
use data_types::SequenceNumber;
/// Information on how much data a particular shard has processed
///
/// ```text
/// Write Lifecycle (compaction not shown):
///
/// Durable --------------> Readable -------------> Persisted
///
/// in shard, in memory, not yet in parquet
/// not readable. in parquet
/// ```
///
/// Note: min_readable_sequence_number <= min_totally_persisted_sequence_number
#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub struct ShardProgress {
/// Smallest sequence number of data that is buffered in memory
min_buffered: Option<SequenceNumber>,
/// Largest sequence number of data that is buffered in memory
max_buffered: Option<SequenceNumber>,
/// Largest sequence number of data that has been written to parquet
max_persisted: Option<SequenceNumber>,
/// The sequence number that is actively buffering, if any. The
/// actively buffering sequence number is not yet completely
/// buffered to all partitions, and thus is excluded from the
/// min/max buffered calculation.
actively_buffering: Option<SequenceNumber>,
}
impl ShardProgress {
pub fn new() -> Self {
Default::default()
}
/// Note that `sequence_number` is buffered
pub fn with_buffered(mut self, sequence_number: SequenceNumber) -> Self {
// clamp if there is any actively buffering operation
let sequence_number = if let Some(v) = clamp(sequence_number, self.actively_buffering) {
v
} else {
return self;
};
self.min_buffered = Some(
self.min_buffered
.take()
.map(|cur| cur.min(sequence_number))
.unwrap_or(sequence_number),
);
self.max_buffered = Some(
self.max_buffered
.take()
.map(|cur| cur.max(sequence_number))
.unwrap_or(sequence_number),
);
self
}
/// Note that the specified sequence number is still actively
/// buffering, and adjusting all subsequent sequence numbers
/// accordingly.
pub fn actively_buffering(mut self, sequence_number: Option<SequenceNumber>) -> Self {
self.actively_buffering = sequence_number;
self
}
/// Note that data with `sequence_number` was persisted; Note this does not
/// mean that all sequence numbers less than `sequence_number`
/// have been persisted.
pub fn with_persisted(mut self, sequence_number: SequenceNumber) -> Self {
self.max_persisted = Some(
self.max_persisted
.take()
.map(|cur| cur.max(sequence_number))
.unwrap_or(sequence_number),
);
self
}
/// Return true if this shard progress has no information on
/// shard progress, false otherwise
pub fn is_empty(&self) -> bool {
self.min_buffered.is_none() && self.max_buffered.is_none() && self.max_persisted.is_none()
}
// return true if this sequence number is readable
pub fn readable(&self, sequence_number: SequenceNumber) -> bool {
match (&self.max_buffered, &self.max_persisted) {
(Some(max_buffered), Some(max_persisted)) => {
&sequence_number <= max_buffered || &sequence_number <= max_persisted
}
(None, Some(max_persisted)) => &sequence_number <= max_persisted,
(Some(max_buffered), _) => &sequence_number <= max_buffered,
(None, None) => {
false // data not yet ingested
}
}
}
// return true if this sequence number is persisted
pub fn persisted(&self, sequence_number: SequenceNumber) -> bool {
// with both buffered and persisted data, need to
// ensure that no data is buffered to know that all is
// persisted
match (&self.min_buffered, &self.max_persisted) {
(Some(min_buffered), Some(max_persisted)) => {
// with both buffered and persisted data, need to
// ensure that no data is buffered to know that all is
// persisted
&sequence_number < min_buffered && &sequence_number <= max_persisted
}
(None, Some(max_persisted)) => &sequence_number <= max_persisted,
(_, None) => {
false // data not yet persisted
}
}
}
/// Combine the values from other
pub fn combine(self, other: Self) -> Self {
let updated = if let Some(min_buffered) = other.min_buffered {
self.with_buffered(min_buffered)
} else {
self
};
let updated = if let Some(max_buffered) = other.max_buffered {
updated.with_buffered(max_buffered)
} else {
updated
};
if let Some(max_persisted) = other.max_persisted {
updated.with_persisted(max_persisted)
} else {
updated
}
}
}
// Ensures that `buffered` is less than `actively_buffering`,
// returning the new buffered value
fn clamp(
buffered: SequenceNumber,
actively_buffering: Option<SequenceNumber>,
) -> Option<SequenceNumber> {
let completed_buffering = if let Some(val) = actively_buffering {
// returns `None` if no sequence number has completed buffering yet
let min_sequence = (val.get() as u64).checked_sub(1)?;
SequenceNumber::new(min_sequence as i64)
} else {
return Some(buffered);
};
if buffered > completed_buffering {
Some(completed_buffering)
} else {
// no adjustment needed
Some(buffered)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty() {
let progress = ShardProgress::new();
let sequence_number = SequenceNumber::new(0);
assert!(!progress.readable(sequence_number));
assert!(!progress.persisted(sequence_number));
}
#[test]
fn persisted() {
let lt = SequenceNumber::new(0);
let eq = SequenceNumber::new(1);
let gt = SequenceNumber::new(2);
let progress = ShardProgress::new().with_persisted(eq);
assert!(progress.readable(lt));
assert!(progress.persisted(lt));
// persisted implies it is also readable
assert!(progress.readable(eq));
assert!(progress.persisted(eq));
assert!(!progress.readable(gt));
assert!(!progress.persisted(gt));
}
#[test]
fn buffered() {
let lt = SequenceNumber::new(0);
let eq = SequenceNumber::new(1);
let gt = SequenceNumber::new(2);
let progress = ShardProgress::new().with_buffered(eq);
assert!(progress.readable(lt));
assert!(!progress.persisted(lt));
assert!(progress.readable(eq));
assert!(!progress.persisted(eq));
assert!(!progress.readable(gt));
assert!(!progress.persisted(gt));
}
#[test]
fn buffered_greater_than_persisted() {
let lt = SequenceNumber::new(0);
let eq = SequenceNumber::new(1);
let gt = SequenceNumber::new(2);
let progress = ShardProgress::new().with_buffered(eq).with_persisted(lt);
assert!(progress.readable(lt));
assert!(progress.persisted(lt));
assert!(progress.readable(eq));
assert!(!progress.persisted(eq));
assert!(!progress.readable(gt));
assert!(!progress.persisted(gt));
}
#[test]
fn buffered_and_persisted() {
let lt = SequenceNumber::new(0);
let eq = SequenceNumber::new(1);
let gt = SequenceNumber::new(2);
let progress = ShardProgress::new().with_buffered(eq).with_persisted(eq);
assert!(progress.readable(lt));
assert!(progress.persisted(lt));
assert!(progress.readable(eq));
assert!(!progress.persisted(eq)); // have buffered data, so can't be persisted here
assert!(!progress.readable(gt));
assert!(!progress.persisted(gt));
}
#[test]
fn buffered_less_than_persisted() {
let lt = SequenceNumber::new(0);
let eq = SequenceNumber::new(1);
let gt = SequenceNumber::new(2);
// data buffered between lt and eq
let progress = ShardProgress::new()
.with_buffered(lt)
.with_buffered(eq)
.with_persisted(eq);
assert!(progress.readable(lt));
assert!(!progress.persisted(lt)); // have buffered data at lt, can't be persisted
assert!(progress.readable(eq));
assert!(!progress.persisted(eq)); // have buffered data, so can't be persisted
assert!(!progress.readable(gt));
assert!(!progress.persisted(gt));
}
#[test]
fn combine() {
let lt = SequenceNumber::new(0);
let eq = SequenceNumber::new(1);
let gt = SequenceNumber::new(2);
let progress1 = ShardProgress::new().with_buffered(gt);
let progress2 = ShardProgress::new().with_buffered(lt).with_persisted(eq);
let expected = ShardProgress::new()
.with_buffered(lt)
.with_buffered(gt)
.with_persisted(eq);
assert_eq!(progress1.combine(progress2), expected);
}
#[test]
fn actively_buffering() {
let num0 = SequenceNumber::new(0);
let num1 = SequenceNumber::new(1);
let num2 = SequenceNumber::new(2);
#[derive(Debug)]
struct Expected {
min_buffered: Option<SequenceNumber>,
max_buffered: Option<SequenceNumber>,
}
let cases = vec![
// No buffering
(
ShardProgress::new()
.actively_buffering(None)
.with_buffered(num1)
.with_buffered(num2),
Expected {
min_buffered: Some(num1),
max_buffered: Some(num2),
},
),
// actively buffering num2
(
ShardProgress::new()
.actively_buffering(Some(num2))
.with_buffered(num1)
.with_buffered(num2),
Expected {
min_buffered: Some(num1),
max_buffered: Some(num1),
},
),
// actively buffering only one
(
ShardProgress::new()
.actively_buffering(Some(num1))
.with_buffered(num1),
Expected {
min_buffered: Some(num0),
max_buffered: Some(num0),
},
),
// actively buffering, haven't buffed any yet
(
ShardProgress::new()
.actively_buffering(Some(num1))
.with_buffered(num0),
Expected {
min_buffered: Some(num0),
max_buffered: Some(num0),
},
),
// actively buffering, haven't buffered any
(
ShardProgress::new().actively_buffering(Some(num0)),
Expected {
min_buffered: None,
max_buffered: None,
},
),
// actively buffering partially buffered
(
ShardProgress::new()
.actively_buffering(Some(num0))
.with_buffered(num0),
Expected {
min_buffered: None,
max_buffered: None,
},
),
];
for (progress, expected) in cases {
println!("Comparing {progress:?} to {expected:?}");
assert_eq!(
progress.min_buffered, expected.min_buffered,
"min buffered mismatch"
);
assert_eq!(
progress.max_buffered, expected.max_buffered,
"max buffered mismatch"
);
assert_eq!(progress.max_persisted, None, "unexpected persisted");
}
}
#[test]
fn test_clamp() {
let num0 = SequenceNumber::new(0);
let num1 = SequenceNumber::new(1);
let num2 = SequenceNumber::new(2);
assert_eq!(clamp(num0, None), Some(num0));
// num0 hasn't completed buffering yet
assert_eq!(clamp(num0, Some(num0)), None);
assert_eq!(clamp(num1, Some(num0)), None);
assert_eq!(clamp(num1, Some(num1)), Some(num0));
assert_eq!(clamp(num1, Some(num2)), Some(num1));
assert_eq!(clamp(num2, Some(num0)), None);
assert_eq!(clamp(num2, Some(num1)), Some(num0));
assert_eq!(clamp(num2, Some(num2)), Some(num1));
}
}