feat: return write_token from HTTP writes to router2 (#4202)

* feat: return write_token from HTTP writes to router2

* fix: Update router2/src/dml_handlers/instrumentation.rs

Co-authored-by: Dom <dom@itsallbroken.com>

* refactor: Use WriteSummary::default more vigorously

* fix: fix typo and add links to follow on issues

Co-authored-by: Dom <dom@itsallbroken.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-04-02 06:34:51 -04:00 committed by GitHub
parent 3aa3ebe0e8
commit 833c10c083
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 504 additions and 85 deletions

15
Cargo.lock generated
View File

@ -2747,6 +2747,7 @@ dependencies = [
"trace_http",
"workspace-hack",
"write_buffer",
"write_summary",
]
[[package]]
@ -4755,6 +4756,7 @@ dependencies = [
"trace",
"workspace-hack",
"write_buffer",
"write_summary",
]
[[package]]
@ -6901,6 +6903,19 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "write_summary"
version = "0.1.0"
dependencies = [
"base64 0.13.0",
"data_types",
"dml",
"generated_types",
"serde_json",
"time 0.1.0",
"workspace-hack",
]
[[package]]
name = "xml-rs"
version = "0.8.4"

View File

@ -78,6 +78,7 @@ members = [
"trogging",
"workspace-hack",
"write_buffer",
"write_summary",
]
default-members = ["influxdb_iox"]

View File

@ -816,3 +816,15 @@ impl IngesterQueryRequest {
}
}
}
/// The information on what sequence numbers were stored for a
/// particular (line protocol) write that may have been sharded /
/// partitioned across multiple sequencers
///
/// This information can be used to wait for a particular write to
/// become readable.
#[derive(Debug, Clone)]
pub struct SequencerWrites {
/// List of sequences
pub sequencers: Vec<Sequence>,
}

View File

@ -14,7 +14,7 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Converts a [`predicate::predicate::Predicate`] into [`read_buffer::Predicate`],
/// Converts a [`predicate::Predicate`] into [`read_buffer::Predicate`],
/// suitable for evaluating on the ReadBuffer.
///
/// NOTE: a valid Read Buffer predicate is not guaranteed to be applicable to an

View File

@ -43,6 +43,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
let schema_path = root.join("influxdata/iox/schema/v1");
let storage_path = root.join("influxdata/platform/storage");
let write_buffer_path = root.join("influxdata/iox/write_buffer/v1");
let write_summary_path = root.join("influxdata/iox/write_summary/v1");
let proto_files = vec![
delete_path.join("service.proto"),
@ -77,6 +78,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
storage_path.join("storage_common.proto"),
storage_path.join("test.proto"),
write_buffer_path.join("write_buffer.proto"),
write_summary_path.join("write_summary.proto"),
];
// Tell cargo to recompile if any of these proto files are changed

View File

@ -0,0 +1,18 @@
syntax = "proto3";
package influxdata.iox.write_summary.v1;
// Represents a single logical write that was partitioned and shared
// into multiple pieces in multiple sequencers
message WriteSummary {
// per sequencer information
repeated SequencerWrite sequencers = 1;
}
// Per sequencer information aout what sequence numbers contain part of a write
message SequencerWrite {
// Unique sequencer ID.
uint32 sequencer_id = 1;
// Which sequence numbers for this sequencer had data
repeated uint64 sequence_numbers = 13;
}

View File

@ -156,6 +156,19 @@ pub mod influxdata {
));
}
}
pub mod write_summary {
pub mod v1 {
include!(concat!(
env!("OUT_DIR"),
"/influxdata.iox.write_summary.v1.rs"
));
include!(concat!(
env!("OUT_DIR"),
"/influxdata.iox.write_summary.v1.serde.rs"
));
}
}
}
pub mod pbdata {

View File

@ -1,8 +1,8 @@
use arrow_util::assert_batches_sorted_eq;
use http::StatusCode;
use test_helpers_end_to_end_ng::{
maybe_skip_integration, query_until_results, rand_name, write_to_router, ServerFixture,
TestConfig,
get_write_token, maybe_skip_integration, query_when_readable, rand_name, write_to_router,
ServerFixture, TestConfig,
};
#[tokio::test]
@ -26,12 +26,14 @@ async fn smoke() {
let response = write_to_router(lp, org, bucket, all_in_one.server().router_http_base()).await;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
let write_token = get_write_token(&response);
// run query in a loop until the data becomes available
let sql = format!("select * from {}", table_name);
let batches = query_until_results(
let batches = query_when_readable(
sql,
namespace,
write_token,
all_in_one.server().querier_grpc_connection(),
)
.await;

View File

@ -1,7 +1,7 @@
use arrow_util::assert_batches_sorted_eq;
use http::StatusCode;
use test_helpers_end_to_end_ng::{
maybe_skip_integration, query_until_results, MiniCluster, TestConfig,
get_write_token, maybe_skip_integration, query_when_readable, MiniCluster, TestConfig,
};
#[tokio::test]
@ -28,11 +28,16 @@ async fn basic_on_parquet() {
let response = cluster.write_to_router(lp).await;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
// assert that the response contains a write token
let write_token = get_write_token(&response);
assert!(!write_token.is_empty());
// run query in a loop until the data becomes available
let sql = format!("select * from {}", table_name);
let batches = query_until_results(
let batches = query_when_readable(
sql,
cluster.namespace(),
write_token,
cluster.querier().querier_grpc_connection(),
)
.await;

View File

@ -22,6 +22,7 @@ time = { path = "../time" }
trace = { path = "../trace" }
trace_http = { path = "../trace_http" }
write_buffer = { path = "../write_buffer" }
write_summary = { path = "../write_summary" }
# Crates.io dependencies, in alphabetical order
arrow-flight = "11"

View File

@ -17,16 +17,17 @@ use router2::{
dml_handlers::{
DmlHandler, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator,
NamespaceAutocreation, Partitioner, SchemaValidator, ShardedWriteBuffer,
WriteSummaryAdapter,
},
namespace_cache::{metrics::InstrumentedCache, MemoryNamespaceCache, ShardedCache},
sequencer::Sequencer,
server::{grpc::GrpcDelegate, http::HttpDelegate, RouterServer},
sharder::JumpHash,
};
use thiserror::Error;
use tokio_util::sync::CancellationToken;
use trace::TraceCollector;
use thiserror::Error;
use write_summary::WriteSummary;
use ioxd_common::{
add_service,
@ -71,7 +72,7 @@ impl<D> RouterServerType<D> {
#[async_trait]
impl<D> ServerType for RouterServerType<D>
where
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>> + 'static,
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = WriteSummary> + 'static,
{
/// Return the [`metric::Registry`] used by the router.
fn metric_registry(&self) -> Arc<Registry> {
@ -233,6 +234,8 @@ pub async fn create_router2_server_type(
//
////////////////////////////////////////////////////////////////////////////
let parallel_write = WriteSummaryAdapter::new(FanOutAdaptor::new(write_buffer));
// Build the chain of DML handlers that forms the request processing
// pipeline, starting with the namespace creator (for testing purposes) and
// write partitioner that yields a set of partitioned batches.
@ -248,7 +251,7 @@ pub async fn create_router2_server_type(
.and_then(InstrumentationDecorator::new(
"parallel_write",
&*metrics,
FanOutAdaptor::new(write_buffer),
parallel_write,
));
// Record the overall request handling latency

View File

@ -35,6 +35,7 @@ tonic = "0.6"
trace = { path = "../trace/" }
workspace-hack = { path = "../workspace-hack"}
write_buffer = { path = "../write_buffer" }
write_summary = { path = "../write_summary" }
[dev-dependencies]
assert_matches = "1.5"

View File

@ -7,6 +7,7 @@ use iox_catalog::{interface::Catalog, mem::MemCatalog};
use router2::{
dml_handlers::{
DmlHandlerChainExt, FanOutAdaptor, Partitioner, SchemaValidator, ShardedWriteBuffer,
WriteSummaryAdapter,
},
namespace_cache::{MemoryNamespaceCache, ShardedCache},
sequencer::Sequencer,
@ -66,8 +67,9 @@ fn e2e_benchmarks(c: &mut Criterion) {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
});
let handler_stack =
schema_validator.and_then(partitioner.and_then(FanOutAdaptor::new(write_buffer)));
let handler_stack = schema_validator.and_then(
partitioner.and_then(WriteSummaryAdapter::new(FanOutAdaptor::new(write_buffer))),
);
HttpDelegate::new(1024, Arc::new(handler_stack), &metrics)
};

View File

@ -2,7 +2,7 @@ use super::DmlHandler;
use async_trait::async_trait;
use data_types2::{DatabaseName, DeletePredicate};
use futures::{stream::FuturesUnordered, TryStreamExt};
use std::{fmt::Debug, future, marker::PhantomData};
use std::{fmt::Debug, marker::PhantomData};
use trace::ctx::SpanContext;
/// A [`FanOutAdaptor`] takes an iterator of DML write operation inputs and
@ -38,7 +38,7 @@ where
U: Iterator<Item = T::WriteInput> + Send + Sync,
{
type WriteInput = I;
type WriteOutput = ();
type WriteOutput = Vec<T::WriteOutput>;
type WriteError = T::WriteError;
type DeleteError = T::DeleteError;
@ -51,7 +51,7 @@ where
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
input
let results = input
.into_iter()
.map(|v| {
let namespace = namespace.clone();
@ -59,10 +59,9 @@ where
async move { self.inner.write(&namespace, v, span_ctx).await }
})
.collect::<FuturesUnordered<_>>()
.try_for_each(|_| future::ready(Ok(())))
.try_collect::<Vec<_>>()
.await?;
Ok(())
Ok(results)
}
/// Pass the delete through to the inner handler.

View File

@ -165,6 +165,7 @@ mod tests {
use metric::Attributes;
use std::sync::Arc;
use trace::{span::SpanStatus, RingBufferTraceCollector, TraceCollector};
use write_summary::WriteSummary;
const HANDLER_NAME: &str = "bananas";
@ -205,10 +206,14 @@ mod tests {
);
}
fn summary() -> WriteSummary {
WriteSummary::default()
}
#[tokio::test]
async fn test_write_ok() {
let ns = "platanos".try_into().unwrap();
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())]));
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())]));
let metrics = Arc::new(metric::Registry::default());
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));

View File

@ -6,6 +6,7 @@ use async_trait::async_trait;
use data_types2::{DatabaseName, DeletePredicate};
use parking_lot::Mutex;
use trace::ctx::SpanContext;
use write_summary::WriteSummary;
/// A captured call to a [`MockDmlHandler`], generic over `W`, the captured
/// [`DmlHandler::WriteInput`] type.
@ -25,7 +26,7 @@ pub enum MockDmlHandlerCall<W> {
#[derive(Debug)]
struct Inner<W> {
calls: Vec<MockDmlHandlerCall<W>>,
write_return: VecDeque<Result<(), DmlError>>,
write_return: VecDeque<Result<WriteSummary, DmlError>>,
delete_return: VecDeque<Result<(), DmlError>>,
}
@ -58,7 +59,10 @@ impl<W> MockDmlHandler<W>
where
W: Clone,
{
pub fn with_write_return(self, ret: impl Into<VecDeque<Result<(), DmlError>>>) -> Self {
pub fn with_write_return(
self,
ret: impl Into<VecDeque<Result<WriteSummary, DmlError>>>,
) -> Self {
self.0.lock().write_return = ret.into();
self
}
@ -93,7 +97,7 @@ where
type WriteError = DmlError;
type DeleteError = DmlError;
type WriteInput = W;
type WriteOutput = ();
type WriteOutput = WriteSummary;
async fn write(
&self,

View File

@ -99,5 +99,8 @@ pub use chain::*;
mod fan_out;
pub use fan_out::*;
mod write_summary;
pub use self::write_summary::*;
#[cfg(test)]
pub mod mock;

View File

@ -11,7 +11,6 @@ use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
use std::{
fmt::{Debug, Display},
future,
sync::Arc,
};
use thiserror::Error;
@ -81,7 +80,7 @@ where
type DeleteError = ShardError;
type WriteInput = Partitioned<HashMap<String, MutableBatch>>;
type WriteOutput = ();
type WriteOutput = Vec<DmlMeta>;
/// Shard `writes` and dispatch the resultant DML operations.
async fn write(
@ -162,30 +161,37 @@ where
/// Enumerates all items in the iterator, maps each to a future that dispatches
/// the [`DmlOperation`] to its paired [`Sequencer`], executes all the futures
/// in parallel and gathers any errors.
async fn parallel_enqueue<T>(v: T) -> Result<(), ShardError>
///
/// Returns a list of the sequences that were written
async fn parallel_enqueue<T>(v: T) -> Result<Vec<DmlMeta>, ShardError>
where
T: Iterator<Item = (Arc<Sequencer>, DmlOperation)> + Send,
{
let mut successes = 0;
let errs = v
.map(|(sequencer, op)| async move {
tokio::spawn(async move { sequencer.enqueue(op).await })
.await
.expect("shard enqueue panic")
})
.collect::<FuturesUnordered<_>>()
.filter_map(|v| {
if v.is_ok() {
successes += 1;
}
future::ready(v.err())
})
.collect::<Vec<WriteBufferError>>()
.await;
let mut successes = vec![];
let mut errs = vec![];
v.map(|(sequencer, op)| async move {
tokio::spawn(async move { sequencer.enqueue(op).await })
.await
.expect("shard enqueue panic")
})
// Use FuturesUnordered so the futures can run in parallel
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>()
.await
// Sort the result into successes/failures upon completion
.into_iter()
.for_each(|v| match v {
Ok(meta) => successes.push(meta),
Err(e) => errs.push(e),
});
match errs.len() {
0 => Ok(()),
_n => Err(ShardError::WriteBufferErrors { successes, errs }),
0 => Ok(successes),
_n => Err(ShardError::WriteBufferErrors {
successes: successes.len(),
errs,
}),
}
}

View File

@ -0,0 +1,60 @@
use super::DmlHandler;
use async_trait::async_trait;
use data_types2::{DatabaseName, DeletePredicate};
use dml::DmlMeta;
use std::fmt::Debug;
use trace::ctx::SpanContext;
use write_summary::WriteSummary;
/// A [`WriteSummaryAdapter`] wraps DML Handler that produces
/// `Vec<Vec<DmlMeta>>` for each write, and produces a WriteSummary,
/// suitable for
/// sending back to a client
#[derive(Debug, Default)]
pub struct WriteSummaryAdapter<T> {
inner: T,
}
impl<T> WriteSummaryAdapter<T> {
/// Construct a [`WriteSummaryAdapter`] that passes DML operations to `inner`
/// concurrently.
pub fn new(inner: T) -> Self {
Self { inner }
}
}
#[async_trait]
impl<T> DmlHandler for WriteSummaryAdapter<T>
where
T: DmlHandler<WriteOutput = Vec<Vec<DmlMeta>>>,
{
type WriteInput = T::WriteInput;
type WriteOutput = WriteSummary;
type WriteError = T::WriteError;
type DeleteError = T::DeleteError;
/// Sends `input` to the inner handler, which returns a
/// `Vec<Vec<DmlMeta>>`, creating a `WriteSummary`
async fn write(
&self,
namespace: &DatabaseName<'static>,
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
let metas = self.inner.write(namespace, input, span_ctx).await?;
Ok(WriteSummary::new(metas))
}
/// Pass the delete through to the inner handler.
async fn delete(
&self,
namespace: &DatabaseName<'static>,
table_name: &str,
predicate: &DeletePredicate,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError> {
self.inner
.delete(namespace, table_name, predicate, span_ctx)
.await
}
}

View File

@ -15,6 +15,7 @@ use schema::selection::Selection;
use std::ops::DerefMut;
use tonic::{Request, Response, Status};
use trace::ctx::SpanContext;
use write_summary::WriteSummary;
use crate::dml_handlers::{DmlError, DmlHandler, PartitionError};
@ -45,7 +46,7 @@ impl<D> GrpcDelegate<D> {
impl<D> GrpcDelegate<D>
where
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>> + 'static,
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = WriteSummary> + 'static,
{
/// Acquire a [`WriteService`] gRPC service implementation.
///
@ -159,6 +160,9 @@ where
"routing grpc write",
);
// TODO return the produced WriteSummary to the client
// https://github.com/influxdata/influxdb_iox/issues/4208
self.dml_handler
.write(&namespace, tables, span_ctx)
.await
@ -265,10 +269,14 @@ mod tests {
use super::*;
fn summary() -> WriteSummary {
WriteSummary::default()
}
#[tokio::test]
async fn test_write_no_batch() {
let metrics = Arc::new(metric::Registry::default());
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())]));
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())]));
let grpc = super::WriteService::new(Arc::clone(&handler), &metrics);
let req = WriteRequest::default();
@ -285,7 +293,7 @@ mod tests {
#[tokio::test]
async fn test_write_no_namespace() {
let metrics = Arc::new(metric::Registry::default());
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())]));
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())]));
let grpc = super::WriteService::new(Arc::clone(&handler), &metrics);
let req = WriteRequest {
@ -307,7 +315,7 @@ mod tests {
#[tokio::test]
async fn test_write_ok() {
let metrics = Arc::new(metric::Registry::default());
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())]));
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())]));
let grpc = super::WriteService::new(Arc::clone(&handler), &metrics);
let req = WriteRequest {

View File

@ -18,6 +18,9 @@ use serde::Deserialize;
use thiserror::Error;
use time::{SystemProvider, TimeProvider};
use trace::ctx::SpanContext;
use write_summary::WriteSummary;
const WRITE_TOKEN_HTTP_HEADER: &str = "X-IOx-Write-Token";
/// Errors returned by the `router2` HTTP request handler.
#[derive(Debug, Error)]
@ -263,7 +266,7 @@ impl<D> HttpDelegate<D, SystemProvider> {
impl<D, T> HttpDelegate<D, T>
where
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>>,
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = WriteSummary>,
T: TimeProvider,
{
/// Routes `req` to the appropriate handler, if any, returning the handler
@ -274,10 +277,16 @@ where
(&Method::POST, "/api/v2/delete") => self.delete_handler(req).await,
_ => return Err(Error::NoHandler),
}
.map(|_| response_no_content())
.map(|summary| {
Response::builder()
.status(StatusCode::NO_CONTENT)
.header(WRITE_TOKEN_HTTP_HEADER, summary.to_token())
.body(Body::empty())
.unwrap()
})
}
async fn write_handler(&self, req: Request<Body>) -> Result<(), Error> {
async fn write_handler(&self, req: Request<Body>) -> Result<WriteSummary, Error> {
let span_ctx: Option<SpanContext> = req.extensions().get().cloned();
let write_info = WriteInfo::try_from(&req)?;
@ -300,7 +309,7 @@ where
Ok(v) => v,
Err(mutable_batch_lp::Error::EmptyPayload) => {
debug!("nothing to write");
return Ok(());
return Ok(WriteSummary::default());
}
Err(e) => return Err(Error::ParseLineProtocol(e)),
};
@ -318,7 +327,8 @@ where
"routing write",
);
self.dml_handler
let summary = self
.dml_handler
.write(&namespace, batches, span_ctx)
.await
.map_err(Into::into)?;
@ -328,10 +338,10 @@ where
self.write_metric_tables.inc(num_tables as _);
self.write_metric_body_size.inc(body.len() as _);
Ok(())
Ok(summary)
}
async fn delete_handler(&self, req: Request<Body>) -> Result<(), Error> {
async fn delete_handler(&self, req: Request<Body>) -> Result<WriteSummary, Error> {
let span_ctx: Option<SpanContext> = req.extensions().get().cloned();
let account = WriteInfo::try_from(&req)?;
@ -376,7 +386,9 @@ where
self.delete_metric_body_size.inc(body.len() as _);
Ok(())
// TODO pass back write summaries for deletes as well
// https://github.com/influxdata/influxdb_iox/issues/4209
Ok(WriteSummary::default())
}
/// Parse the request's body into raw bytes, applying the configured size
@ -437,13 +449,6 @@ where
}
}
fn response_no_content() -> Response<Body> {
Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Body::empty())
.unwrap()
}
#[cfg(test)]
mod tests {
use std::{io::Write, iter, sync::Arc};
@ -460,6 +465,10 @@ mod tests {
const MAX_BYTES: usize = 1024;
fn summary() -> WriteSummary {
WriteSummary::default()
}
fn assert_metric_hit(metrics: &metric::Registry, name: &'static str, value: Option<u64>) {
let counter = metrics
.get_instrument::<Metric<U64Counter>>(name)
@ -639,7 +648,7 @@ mod tests {
ok,
query_string = "?org=bananas&bucket=test",
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, ..}] => {
assert_eq!(namespace, "bananas_test");
@ -650,7 +659,7 @@ mod tests {
ok_precision_s,
query_string = "?org=bananas&bucket=test&precision=s",
body = "platanos,tag1=A,tag2=B val=42i 1647622847".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => {
assert_eq!(namespace, "bananas_test");
@ -665,7 +674,7 @@ mod tests {
ok_precision_ms,
query_string = "?org=bananas&bucket=test&precision=ms",
body = "platanos,tag1=A,tag2=B val=42i 1647622847000".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => {
assert_eq!(namespace, "bananas_test");
@ -680,7 +689,7 @@ mod tests {
ok_precision_us,
query_string = "?org=bananas&bucket=test&precision=us",
body = "platanos,tag1=A,tag2=B val=42i 1647622847000000".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => {
assert_eq!(namespace, "bananas_test");
@ -695,7 +704,7 @@ mod tests {
ok_precision_ns,
query_string = "?org=bananas&bucket=test&precision=ns",
body = "platanos,tag1=A,tag2=B val=42i 1647622847000000000".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => {
assert_eq!(namespace, "bananas_test");
@ -711,7 +720,7 @@ mod tests {
// SECONDS, so multiplies the provided timestamp by 1,000,000,000
query_string = "?org=bananas&bucket=test&precision=s",
body = "platanos,tag1=A,tag2=B val=42i 1647622847000000000".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Err(Error::ParseLineProtocol(_)),
want_dml_calls = []
);
@ -720,7 +729,7 @@ mod tests {
no_query_params,
query_string = "",
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Err(Error::InvalidOrgBucket(OrgBucketError::NotSpecified)),
want_dml_calls = [] // None
);
@ -729,7 +738,7 @@ mod tests {
no_org_bucket,
query_string = "?",
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Err(Error::InvalidOrgBucket(OrgBucketError::DecodeFail(_))),
want_dml_calls = [] // None
);
@ -738,7 +747,7 @@ mod tests {
empty_org_bucket,
query_string = "?org=&bucket=",
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Err(Error::InvalidOrgBucket(OrgBucketError::NotSpecified)),
want_dml_calls = [] // None
);
@ -747,7 +756,7 @@ mod tests {
invalid_org_bucket,
query_string = format!("?org=test&bucket={}", "A".repeat(1000)),
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Err(Error::InvalidOrgBucket(OrgBucketError::MappingFail(_))),
want_dml_calls = [] // None
);
@ -756,7 +765,7 @@ mod tests {
invalid_line_protocol,
query_string = "?org=bananas&bucket=test",
body = "not line protocol".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Err(Error::ParseLineProtocol(_)),
want_dml_calls = [] // None
);
@ -765,7 +774,7 @@ mod tests {
non_utf8_body,
query_string = "?org=bananas&bucket=test",
body = vec![0xc3, 0x28],
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Err(Error::NonUtf8Body(_)),
want_dml_calls = [] // None
);
@ -793,7 +802,7 @@ mod tests {
.flat_map(|s| s.bytes())
.collect::<Vec<u8>>()
},
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Err(Error::RequestSizeExceeded(_)),
want_dml_calls = [] // None
);

View File

@ -10,7 +10,7 @@ use router2::{
dml_handlers::{
Chain, DmlError, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator,
NamespaceAutocreation, Partitioned, Partitioner, SchemaError, SchemaValidator,
ShardedWriteBuffer,
ShardedWriteBuffer, WriteSummaryAdapter,
},
namespace_cache::{MemoryNamespaceCache, ShardedCache},
sequencer::Sequencer,
@ -55,9 +55,11 @@ type HttpDelegateStack = HttpDelegate<
>,
Partitioner,
>,
FanOutAdaptor<
ShardedWriteBuffer<JumpHash<Arc<Sequencer>>>,
Vec<Partitioned<HashMap<String, MutableBatch>>>,
WriteSummaryAdapter<
FanOutAdaptor<
ShardedWriteBuffer<JumpHash<Arc<Sequencer>>>,
Vec<Partitioned<HashMap<String, MutableBatch>>>,
>,
>,
>,
>,
@ -109,7 +111,9 @@ impl TestContext {
let handler_stack = ns_creator
.and_then(schema_validator)
.and_then(partitioner)
.and_then(FanOutAdaptor::new(sharded_write_buffer));
.and_then(WriteSummaryAdapter::new(FanOutAdaptor::new(
sharded_write_buffer,
)));
let handler_stack = InstrumentationDecorator::new("request", &*metrics, handler_stack);

View File

@ -34,20 +34,41 @@ pub async fn write_to_router(
.expect("http error sending write")
}
/// Runs a query using the flight API on the specified connection
/// until responses are produced
///
/// The retry loop is used to wait for writes to become visible
/// Extracts the write token from the specified response (to the /api/v2/write api)
pub fn get_write_token(response: &Response<Body>) -> String {
let message = format!("no write token in {:?}", response);
response
.headers()
.get("X-IOx-Write-Token")
.expect(&message)
.to_str()
.expect("Value not a string")
.to_string()
}
const MAX_QUERY_RETRY_TIME_SEC: u64 = 10;
pub async fn query_until_results(
/// Runs a query using the flight API on the specified connection
/// until responses are produced.
///
/// (Will) eventually Wait until data from the specified write token
/// is readable, but currently waits for
///
/// The retry loop is used to wait for writes to become visible
pub async fn query_when_readable(
sql: impl Into<String>,
namespace: impl Into<String>,
write_token: impl Into<String>,
connection: Connection,
) -> Vec<RecordBatch> {
let namespace = namespace.into();
let sql = sql.into();
println!(
"(TODO) Waiting for Write Token to be visible {}",
write_token.into()
);
let mut client = influxdb_iox_client::flight::Client::new(connection);
// This does nothing except test the client handshake implementation.

26
write_summary/Cargo.toml Normal file
View File

@ -0,0 +1,26 @@
[package]
name = "write_summary"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# Workspace dependencies, in alphabetical order
dml = { path = "../dml" }
generated_types = { path = "../generated_types" }
# Crates.io dependencies, in alphabetical order
base64 = "0.13"
serde_json = "1.0.79"
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
# Workspace dependencies, in alphabetical order
data_types = { path = "../data_types" }
time = { path = "../time" }
# Crates.io dependencies, in alphabetical order

199
write_summary/src/lib.rs Normal file
View File

@ -0,0 +1,199 @@
use std::collections::BTreeMap;
/// Protobuf to/from conversion
use generated_types::influxdata::iox::write_summary::v1 as proto;
use dml::DmlMeta;
/// Contains information about a single write.
///
/// A single write consisting of multiple lines of line protocol
/// formatted data are shared and partitioned across potentially
/// several sequencers which are then processed by the ingester to
/// become readable at potentially different times.
///
/// This struct contains sufficient information to determine the
/// current state of the write as a whole
#[derive(Debug, Default)]
/// Summary of a Vec<Vec<DmlMeta>>
pub struct WriteSummary {
metas: Vec<Vec<DmlMeta>>,
}
impl WriteSummary {
pub fn new(metas: Vec<Vec<DmlMeta>>) -> Self {
Self { metas }
}
/// Return an opaque summary "token" of this summary
pub fn to_token(self) -> String {
let proto_write_summary: proto::WriteSummary = self.into();
base64::encode(
serde_json::to_string(&proto_write_summary)
.expect("unexpected error serializing token to json"),
)
}
}
impl From<WriteSummary> for proto::WriteSummary {
fn from(summary: WriteSummary) -> Self {
// create a map from sequencer_id to sequences
let sequences = summary
.metas
.iter()
.flat_map(|v| v.iter())
.filter_map(|meta| meta.sequence());
// Use BTreeMap to ensure consistent output
let mut sequencers = BTreeMap::new();
for s in sequences {
sequencers
.entry(s.sequencer_id)
.or_insert_with(Vec::new)
.push(s.sequence_number)
}
let sequencers = sequencers
.into_iter()
.map(|(sequencer_id, sequence_numbers)| proto::SequencerWrite {
sequencer_id,
sequence_numbers,
})
.collect();
Self { sequencers }
}
}
#[cfg(test)]
mod tests {
use super::*;
use data_types::sequence::Sequence;
#[test]
fn empty() {
let metas = vec![];
let summary: proto::WriteSummary = WriteSummary::new(metas).into();
let expected = proto::WriteSummary { sequencers: vec![] };
assert_eq!(summary, expected);
}
#[test]
fn one() {
let metas = vec![vec![make_meta(Sequence::new(1, 2))]];
let summary: proto::WriteSummary = WriteSummary::new(metas).into();
let expected = proto::WriteSummary {
sequencers: vec![proto::SequencerWrite {
sequencer_id: 1,
sequence_numbers: vec![2],
}],
};
assert_eq!(summary, expected);
}
#[test]
fn many() {
let metas = vec![
vec![
make_meta(Sequence::new(1, 2)),
make_meta(Sequence::new(10, 20)),
],
vec![make_meta(Sequence::new(1, 3))],
];
let summary: proto::WriteSummary = WriteSummary::new(metas).into();
let expected = proto::WriteSummary {
sequencers: vec![
proto::SequencerWrite {
sequencer_id: 1,
sequence_numbers: vec![2, 3],
},
proto::SequencerWrite {
sequencer_id: 10,
sequence_numbers: vec![20],
},
],
};
assert_eq!(summary, expected);
}
#[test]
fn different_order() {
// order in sequences shouldn't matter
let metas1 = vec![vec![
make_meta(Sequence::new(1, 2)),
make_meta(Sequence::new(2, 3)),
]];
// order in sequences shouldn't matter
let metas2 = vec![vec![
make_meta(Sequence::new(2, 3)),
make_meta(Sequence::new(1, 2)),
]];
let summary1: proto::WriteSummary = WriteSummary::new(metas1).into();
let summary2: proto::WriteSummary = WriteSummary::new(metas2).into();
let expected = proto::WriteSummary {
sequencers: vec![
proto::SequencerWrite {
sequencer_id: 1,
sequence_numbers: vec![2],
},
proto::SequencerWrite {
sequencer_id: 2,
sequence_numbers: vec![3],
},
],
};
assert_eq!(summary1, expected);
assert_eq!(summary2, expected);
}
#[test]
fn token_creation() {
let metas = vec![vec![make_meta(Sequence::new(1, 2))]];
let summary = WriteSummary::new(metas.clone());
let summary_copy = WriteSummary::new(metas);
let metas2 = vec![vec![make_meta(Sequence::new(2, 3))]];
let summary2 = WriteSummary::new(metas2);
let token = summary.to_token();
// non empty
assert!(!token.is_empty());
// same when created with same metas
assert_eq!(token, summary_copy.to_token());
// different when created with different metas
assert_ne!(token, summary2.to_token());
assert!(
!token.contains("sequenceNumbers"),
"token not obscured: {}",
token
);
assert!(
!token.contains("sequencers"),
"token not obscured: {}",
token
);
}
fn make_meta(s: Sequence) -> DmlMeta {
use time::TimeProvider;
let time_provider = time::SystemProvider::new();
let span_context = None;
let bytes_read = 132;
DmlMeta::sequenced(s, time_provider.now(), span_context, bytes_read)
}
}