Merge pull request #6274 from influxdata/dom/sequence-rpc-write
feat: sequence rpc writespull/24376/head
commit
be726a8327
|
@ -2469,6 +2469,7 @@ dependencies = [
|
|||
"async-trait",
|
||||
"backoff",
|
||||
"bytes",
|
||||
"crossbeam-utils",
|
||||
"data_types",
|
||||
"datafusion",
|
||||
"datafusion_util",
|
||||
|
|
|
@ -12,6 +12,7 @@ arrow_util = { version = "0.1.0", path = "../arrow_util" }
|
|||
async-trait = "0.1.58"
|
||||
backoff = { version = "0.1.0", path = "../backoff" }
|
||||
bytes = "1.3.0"
|
||||
crossbeam-utils = "0.8.14"
|
||||
data_types = { version = "0.1.0", path = "../data_types" }
|
||||
datafusion.workspace = true
|
||||
datafusion_util = { path = "../datafusion_util" }
|
||||
|
|
|
@ -17,6 +17,7 @@ use crate::{
|
|||
BufferTree,
|
||||
},
|
||||
server::grpc::GrpcDelegate,
|
||||
timestamp_oracle::TimestampOracle,
|
||||
TRANSITION_SHARD_ID,
|
||||
};
|
||||
|
||||
|
@ -146,5 +147,10 @@ pub async fn new(
|
|||
metrics,
|
||||
));
|
||||
|
||||
Ok(GrpcDelegate::new(Arc::clone(&buffer), buffer))
|
||||
// TODO: replay WAL into buffer
|
||||
//
|
||||
// TODO: recover next sequence number from WAL
|
||||
let timestamp = Arc::new(TimestampOracle::new(0));
|
||||
|
||||
Ok(GrpcDelegate::new(Arc::clone(&buffer), buffer, timestamp))
|
||||
}
|
||||
|
|
|
@ -15,11 +15,12 @@
|
|||
unreachable_pub
|
||||
)]
|
||||
|
||||
use data_types::ShardId;
|
||||
use data_types::{ShardId, ShardIndex};
|
||||
|
||||
/// During the testing of ingester2, the catalog will require a ShardId for
|
||||
/// various operations. This is a const value for these occasions.
|
||||
const TRANSITION_SHARD_ID: ShardId = ShardId::new(1);
|
||||
const TRANSITION_SHARD_INDEX: ShardIndex = ShardIndex::new(1);
|
||||
|
||||
/// Ingester initialisation methods & types.
|
||||
///
|
||||
|
@ -48,6 +49,7 @@ mod query;
|
|||
mod query_adaptor;
|
||||
mod sequence_range;
|
||||
pub(crate) mod server;
|
||||
mod timestamp_oracle;
|
||||
|
||||
#[cfg(test)]
|
||||
mod test_util;
|
||||
|
|
|
@ -17,6 +17,7 @@ use crate::{
|
|||
dml_sink::DmlSink,
|
||||
init::IngesterRpcInterface,
|
||||
query::{response::QueryResponse, QueryExec},
|
||||
timestamp_oracle::TimestampOracle,
|
||||
};
|
||||
|
||||
use self::rpc_write::RpcWrite;
|
||||
|
@ -30,6 +31,7 @@ use self::rpc_write::RpcWrite;
|
|||
pub(crate) struct GrpcDelegate<D, Q> {
|
||||
dml_sink: Arc<D>,
|
||||
query_exec: Arc<Q>,
|
||||
timestamp: Arc<TimestampOracle>,
|
||||
}
|
||||
|
||||
impl<D, Q> GrpcDelegate<D, Q>
|
||||
|
@ -38,10 +40,15 @@ where
|
|||
Q: QueryExec<Response = QueryResponse> + 'static,
|
||||
{
|
||||
/// Initialise a new [`GrpcDelegate`].
|
||||
pub(crate) fn new(dml_sink: Arc<D>, query_exec: Arc<Q>) -> Self {
|
||||
pub(crate) fn new(
|
||||
dml_sink: Arc<D>,
|
||||
query_exec: Arc<Q>,
|
||||
timestamp: Arc<TimestampOracle>,
|
||||
) -> Self {
|
||||
Self {
|
||||
dml_sink,
|
||||
query_exec,
|
||||
timestamp,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -71,7 +78,10 @@ where
|
|||
///
|
||||
/// [`WriteService`]: generated_types::influxdata::iox::catalog::v1::write_service_server::WriteService.
|
||||
fn write_service(&self) -> WriteServiceServer<Self::WriteHandler> {
|
||||
WriteServiceServer::new(RpcWrite::new(Arc::clone(&self.dml_sink)))
|
||||
WriteServiceServer::new(RpcWrite::new(
|
||||
Arc::clone(&self.dml_sink),
|
||||
Arc::clone(&self.timestamp),
|
||||
))
|
||||
}
|
||||
|
||||
/// Return an Arrow [`FlightService`] gRPC implementation.
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
use data_types::{NamespaceId, PartitionKey, TableId};
|
||||
use std::sync::Arc;
|
||||
|
||||
use data_types::{NamespaceId, PartitionKey, Sequence, TableId};
|
||||
use dml::{DmlMeta, DmlOperation, DmlWrite};
|
||||
use generated_types::influxdata::iox::ingester::v1::{
|
||||
self as proto, write_service_server::WriteService,
|
||||
|
@ -9,7 +11,11 @@ use observability_deps::tracing::*;
|
|||
use thiserror::Error;
|
||||
use tonic::{Request, Response};
|
||||
|
||||
use crate::dml_sink::{DmlError, DmlSink};
|
||||
use crate::{
|
||||
dml_sink::{DmlError, DmlSink},
|
||||
timestamp_oracle::TimestampOracle,
|
||||
TRANSITION_SHARD_INDEX,
|
||||
};
|
||||
|
||||
/// A list of error states when handling an RPC write request.
|
||||
///
|
||||
|
@ -86,14 +92,15 @@ fn map_write_error(e: mutable_batch::Error) -> tonic::Status {
|
|||
#[derive(Debug)]
|
||||
pub(crate) struct RpcWrite<T> {
|
||||
sink: T,
|
||||
timestamp: Arc<TimestampOracle>,
|
||||
}
|
||||
|
||||
impl<T> RpcWrite<T> {
|
||||
/// Instantiate a new [`RpcWrite`] that pushes [`DmlOperation`] instances
|
||||
/// into `sink`.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn new(sink: T) -> Self {
|
||||
Self { sink }
|
||||
pub(crate) fn new(sink: T, timestamp: Arc<TimestampOracle>) -> Self {
|
||||
Self { sink, timestamp }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -142,10 +149,18 @@ where
|
|||
.map(|(k, v)| (TableId::new(k), v))
|
||||
.collect(),
|
||||
partition_key,
|
||||
// The tracing context should be propagated over the RPC boundary.
|
||||
//
|
||||
// See https://github.com/influxdata/influxdb_iox/issues/6177
|
||||
DmlMeta::unsequenced(None),
|
||||
DmlMeta::sequenced(
|
||||
Sequence {
|
||||
shard_index: TRANSITION_SHARD_INDEX, // TODO: remove this from DmlMeta
|
||||
sequence_number: self.timestamp.next(),
|
||||
},
|
||||
iox_time::Time::MAX, // TODO: remove this from DmlMeta
|
||||
// The tracing context should be propagated over the RPC boundary.
|
||||
//
|
||||
// See https://github.com/influxdata/influxdb_iox/issues/6177
|
||||
None,
|
||||
42, // TODO: remove this from DmlMeta
|
||||
),
|
||||
);
|
||||
|
||||
// Apply the DML op to the in-memory buffer.
|
||||
|
@ -191,7 +206,8 @@ mod tests {
|
|||
let mock = Arc::new(
|
||||
MockDmlSink::default().with_apply_return(vec![$sink_ret]),
|
||||
);
|
||||
let handler = RpcWrite::new(Arc::clone(&mock));
|
||||
let timestamp = Arc::new(TimestampOracle::new(0));
|
||||
let handler = RpcWrite::new(Arc::clone(&mock), timestamp);
|
||||
|
||||
let ret = handler
|
||||
.write(Request::new($request))
|
||||
|
@ -238,6 +254,7 @@ mod tests {
|
|||
assert_eq!(w.namespace_id(), NAMESPACE_ID);
|
||||
assert_eq!(w.table_count(), 1);
|
||||
assert_eq!(*w.partition_key(), PartitionKey::from(PARTITION_KEY));
|
||||
assert_eq!(w.meta().sequence().unwrap().sequence_number.get(), 1);
|
||||
}
|
||||
);
|
||||
|
||||
|
@ -294,4 +311,58 @@ mod tests {
|
|||
want_err = true,
|
||||
want_calls = []
|
||||
);
|
||||
|
||||
/// A property test asserting that writes that succeed earlier writes have
|
||||
/// greater timestamps assigned.
|
||||
#[tokio::test]
|
||||
async fn test_rpc_write_ordered_timestamps() {
|
||||
let mock = Arc::new(MockDmlSink::default().with_apply_return(vec![Ok(()), Ok(())]));
|
||||
let timestamp = Arc::new(TimestampOracle::new(0));
|
||||
let handler = RpcWrite::new(Arc::clone(&mock), timestamp);
|
||||
|
||||
let req = proto::WriteRequest {
|
||||
payload: Some(DatabaseBatch {
|
||||
database_id: NAMESPACE_ID.get(),
|
||||
partition_key: PARTITION_KEY.to_string(),
|
||||
table_batches: vec![TableBatch {
|
||||
table_id: 42,
|
||||
columns: vec![Column {
|
||||
column_name: "time".to_string(),
|
||||
semantic_type: SemanticType::Time.into(),
|
||||
values: Some(Values {
|
||||
i64_values: vec![4242],
|
||||
f64_values: vec![],
|
||||
u64_values: vec![],
|
||||
string_values: vec![],
|
||||
bool_values: vec![],
|
||||
bytes_values: vec![],
|
||||
packed_string_values: None,
|
||||
interned_string_values: None,
|
||||
}),
|
||||
null_mask: vec![0],
|
||||
}],
|
||||
row_count: 1,
|
||||
}],
|
||||
}),
|
||||
};
|
||||
|
||||
handler
|
||||
.write(Request::new(req.clone()))
|
||||
.await
|
||||
.expect("write should succeed");
|
||||
|
||||
handler
|
||||
.write(Request::new(req))
|
||||
.await
|
||||
.expect("write should succeed");
|
||||
|
||||
assert_matches!(
|
||||
*mock.get_calls(),
|
||||
[DmlOperation::Write(ref w1), DmlOperation::Write(ref w2)] => {
|
||||
let w1 = w1.meta().sequence().unwrap().sequence_number.get();
|
||||
let w2 = w2.meta().sequence().unwrap().sequence_number.get();
|
||||
assert!(w1 < w2);
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,111 @@
|
|||
//! A provider of ordered timestamps, exposed as a [`SequenceNumber`].
|
||||
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
use crossbeam_utils::CachePadded;
|
||||
use data_types::SequenceNumber;
|
||||
|
||||
/// A concurrency-safe provider of totally ordered [`SequenceNumber`] values.
|
||||
///
|
||||
/// Given a single [`TimestampOracle`] instance, the [`SequenceNumber`] values
|
||||
/// returned by calling [`TimestampOracle::next()`] are guaranteed to be totally
|
||||
/// ordered and consecutive.
|
||||
///
|
||||
/// No ordering exists across independent [`TimestampOracle`] instances.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TimestampOracle(CachePadded<AtomicU64>);
|
||||
|
||||
impl TimestampOracle {
|
||||
/// Construct a [`TimestampOracle`] that returns values starting from
|
||||
/// `last_value + 1`.
|
||||
pub(crate) fn new(last_value: u64) -> Self {
|
||||
Self(CachePadded::new(AtomicU64::new(last_value + 1)))
|
||||
}
|
||||
|
||||
/// Obtain the next [`SequenceNumber`].
|
||||
pub(crate) fn next(&self) -> SequenceNumber {
|
||||
// Correctness:
|
||||
//
|
||||
// A relaxed atomic store has a consistent modification order, with two
|
||||
// racing threads calling fetch_add resolving into a defined ordering of
|
||||
// one having called before the other. This ordering will never change
|
||||
// or diverge between threads.
|
||||
let v = self.0.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
SequenceNumber::new(v as i64)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
|
||||
/// Ensure the next value read from a newly initialised [`TimestampOracle`]
|
||||
/// is always one greater than the init value.
|
||||
///
|
||||
/// This ensures that a total ordering is derived even if a caller provides
|
||||
/// the last actual value, or the next expected value (at the expense of a
|
||||
/// possible gap in the sequence). This helps avoid off-by-one bugs in the
|
||||
/// caller.
|
||||
#[test]
|
||||
fn test_init_next_val() {
|
||||
let oracle = TimestampOracle::new(41);
|
||||
assert_eq!(oracle.next().get(), 42);
|
||||
}
|
||||
|
||||
/// A property test ensuring that for N threads competing to sequence M
|
||||
/// operations, a total order of operations is derived from consecutive
|
||||
/// timestamps returned by a single [`TimestampOracle`] instance.
|
||||
#[test]
|
||||
#[allow(clippy::needless_collect)]
|
||||
fn test_concurrency() {
|
||||
// The number of threads that will race to obtain N_SEQ each.
|
||||
const N_THREADS: usize = 10;
|
||||
|
||||
// The number of SequenceNumber timestamps each thread will acquire.
|
||||
const N_SEQ: usize = 100;
|
||||
|
||||
// The init value for the TimestampOracle
|
||||
const LAST_VALUE: usize = 0;
|
||||
|
||||
// The total number of SequenceNumber to be acquired in this test.
|
||||
const TOTAL_SEQ: usize = N_SEQ * N_THREADS;
|
||||
|
||||
let oracle = Arc::new(TimestampOracle::new(LAST_VALUE as u64));
|
||||
let barrier = Arc::new(std::sync::Barrier::new(N_THREADS));
|
||||
|
||||
// Spawn the desired number of threads, synchronise their starting
|
||||
// point, and then race them to obtain TOTAL_SEQ number of timestamps.
|
||||
//
|
||||
// Each thread returns an vector of the timestamp values it acquired.
|
||||
let handles = (0..N_THREADS)
|
||||
.map(|_| {
|
||||
let oracle = Arc::clone(&oracle);
|
||||
let barrier = Arc::clone(&barrier);
|
||||
std::thread::spawn(move || {
|
||||
barrier.wait(); // synchronise start time
|
||||
(0..N_SEQ).map(|_| oracle.next().get()).collect::<Vec<_>>()
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Collect all the timestamps from all the threads
|
||||
let mut timestamps = handles
|
||||
.into_iter()
|
||||
.flat_map(|h| h.join().expect("thread panic"))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Sort the timestamps
|
||||
timestamps.sort_unstable();
|
||||
|
||||
// Assert the complete set of values from the expected range appear,
|
||||
// unduplicated, and totally ordered.
|
||||
let expected = (LAST_VALUE + 1)..TOTAL_SEQ + 1;
|
||||
timestamps
|
||||
.into_iter()
|
||||
.zip(expected)
|
||||
.for_each(|(got, want)| assert_eq!(got, want as i64));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue