docs: Updating wrapping, content, and grammar of comments
parent
c811bebdb7
commit
ab72c93a5e
|
@ -15,19 +15,18 @@ pub struct IngesterConfig {
|
|||
)]
|
||||
pub write_buffer_partition_range_end: i32,
|
||||
|
||||
/// The ingester will continue to pull data and buffer it from Kafka
|
||||
/// as long as it is below this size. If it hits this size it will pause
|
||||
/// ingest from Kafka until persistence goes below this threshold.
|
||||
/// The ingester will continue to pull data and buffer it from the write buffer as long as the
|
||||
/// ingester buffer is below this size. If the ingester buffer hits this size, ingest from the
|
||||
/// write buffer will pause until the ingester buffer goes below this threshold.
|
||||
#[clap(
|
||||
long = "--pause-ingest-size-bytes",
|
||||
env = "INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES"
|
||||
)]
|
||||
pub pause_ingest_size_bytes: usize,
|
||||
|
||||
/// Once the ingester crosses this threshold of data buffered across
|
||||
/// all sequencers, it will pick the largest partitions and persist
|
||||
/// them until it falls below this threshold. An ingester running in
|
||||
/// a steady state is expected to take up this much memory.
|
||||
/// Once the ingester crosses this threshold of data buffered across all sequencers, it will
|
||||
/// pick the largest partitions and persist them until it falls below this threshold. An
|
||||
/// ingester running in a steady state is expected to take up this much memory.
|
||||
#[clap(
|
||||
long = "--persist-memory-threshold-bytes",
|
||||
env = "INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES"
|
||||
|
@ -43,10 +42,9 @@ pub struct IngesterConfig {
|
|||
)]
|
||||
pub persist_partition_size_threshold_bytes: usize,
|
||||
|
||||
/// If a partition has had data buffered for longer than this period of time
|
||||
/// it will be persisted. This puts an upper bound on how far back the
|
||||
/// ingester may need to read in Kafka on restart or recovery. The default value
|
||||
/// is 30 minutes (in seconds).
|
||||
/// If a partition has had data buffered for longer than this period of time, it will be
|
||||
/// persisted. This puts an upper bound on how far back the ingester may need to read from the
|
||||
/// write buffer on restart or recovery. The default value is 30 minutes (in seconds).
|
||||
#[clap(
|
||||
long = "--persist-partition-age-threshold-seconds",
|
||||
env = "INFLUXDB_IOX_PERSIST_PARTITION_AGE_THRESHOLD_SECONDS",
|
||||
|
|
|
@ -164,15 +164,15 @@ where
|
|||
O: DmlSink,
|
||||
T: TimeProvider,
|
||||
{
|
||||
/// Run the stream handler, consuming items from [`Stream`] and applying
|
||||
/// them to the [`DmlSink`].
|
||||
/// Run the stream handler, consuming items from the stream provided by the
|
||||
/// [`WriteBufferStreamHandler`] and applying them to the [`DmlSink`].
|
||||
///
|
||||
/// This method blocks until gracefully shutdown by cancelling the
|
||||
/// `shutdown` [`CancellationToken`]. Once cancelled, this handler will
|
||||
/// complete the current operation it is processing before this method
|
||||
/// returns.
|
||||
///
|
||||
/// # Panics
|
||||
/// # Panics
|
||||
///
|
||||
/// This method panics if the input stream ends (yields a `None`).
|
||||
pub async fn run(mut self, shutdown: CancellationToken) {
|
||||
|
@ -183,8 +183,7 @@ where
|
|||
let mut reset_to_earliest_once = false;
|
||||
|
||||
loop {
|
||||
// Wait for a DML operation from the sequencer, or a graceful stop
|
||||
// signal.
|
||||
// Wait for a DML operation from the sequencer, or a graceful stop signal.
|
||||
let maybe_op = futures::select!(
|
||||
next = stream.next().fuse() => next,
|
||||
_ = shutdown_fut => {
|
||||
|
@ -493,12 +492,12 @@ mod tests {
|
|||
macro_rules! test_stream_handler {
|
||||
(
|
||||
$name:ident,
|
||||
stream_ops = $stream_ops:expr, // An ordered set of stream items to feed to the handler
|
||||
sink_rets = $sink_ret:expr, // An ordered set of values to return from the mock op sink
|
||||
stream_ops = $stream_ops:expr, // Ordered set of stream items to feed to the handler
|
||||
sink_rets = $sink_ret:expr, // Ordered set of values to return from the mock op sink
|
||||
want_ttbr = $want_ttbr:literal, // Desired TTBR value in milliseconds
|
||||
// Optional set of ingest error metric label / values to assert
|
||||
want_err_metrics = [$($metric_name:literal => $metric_count:literal),*],
|
||||
want_sink = $($want_sink:tt)+ // A pattern to match against the calls made to the op sink
|
||||
want_sink = $($want_sink:tt)+ // Pattern to match against calls made to the op sink
|
||||
) => {
|
||||
paste::paste! {
|
||||
#[tokio::test]
|
||||
|
@ -797,8 +796,7 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
// An abnormal end to the steam causes a panic, rather than a silent stream
|
||||
// reader exit.
|
||||
// An abnormal end to the steam causes a panic, rather than a silent stream reader exit.
|
||||
#[tokio::test]
|
||||
#[should_panic(
|
||||
expected = "sequencer KafkaPartition(42) stream for topic kafka_topic_name ended without \
|
||||
|
|
|
@ -136,10 +136,11 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static {
|
|||
///
|
||||
/// The [`dml::DmlMeta`] will be propagated where applicable
|
||||
///
|
||||
/// This call may "async block" (i.e. be in a pending state) to accumulate multiple operations into a single batch.
|
||||
/// After this method returns the operation was actually written (i.e. it is NOT buffered any longer). You may use
|
||||
/// [`flush`](Self::flush) to trigger an early submission (e.g. before some linger time expired), which can be
|
||||
/// helpful for controlled shutdown.
|
||||
/// This call may "async block" (i.e. be in a pending state) to accumulate multiple operations
|
||||
/// into a single batch. After this method returns the operation was actually written (i.e. it
|
||||
/// is NOT buffered any longer). You may use [`flush`](Self::flush) to trigger an early
|
||||
/// submission (e.g. before some linger time expired), which can be helpful for controlled
|
||||
/// shutdown.
|
||||
///
|
||||
/// Returns the metadata that was written.
|
||||
async fn store_operation(
|
||||
|
@ -168,8 +169,9 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static {
|
|||
/// Flush all currently blocking store operations ([`store_operation`](Self::store_operation) /
|
||||
/// [`store_lp`](Self::store_lp)).
|
||||
///
|
||||
/// This call is pending while outstanding data is being submitted and will return AFTER the flush completed.
|
||||
/// However you still need to poll the store operations to get the metadata for every write.
|
||||
/// This call is pending while outstanding data is being submitted and will return AFTER the
|
||||
/// flush completed. However you still need to poll the store operations to get the metadata
|
||||
/// for every write.
|
||||
async fn flush(&self) -> Result<(), WriteBufferError>;
|
||||
|
||||
/// Return type (like `"mock"` or `"kafka"`) of this writer.
|
||||
|
@ -183,10 +185,11 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static {
|
|||
pub trait WriteBufferStreamHandler: Sync + Send + Debug + 'static {
|
||||
/// Stream that produces DML operations.
|
||||
///
|
||||
/// Note that due to the mutable borrow, it is not possible to have multiple streams from the same
|
||||
/// [`WriteBufferStreamHandler`] instance at the same time. If all streams are dropped and requested again, the last
|
||||
/// sequence number of the old streams will be the start sequence number for the new streams. If you want to
|
||||
/// prevent that either create a new [`WriteBufferStreamHandler`] or use [`seek`](Self::seek).
|
||||
/// Note that due to the mutable borrow, it is not possible to have multiple streams from the
|
||||
/// same [`WriteBufferStreamHandler`] instance at the same time. If all streams are dropped and
|
||||
/// requested again, the last sequence number of the old streams will be the start sequence
|
||||
/// number for the new streams. If you want to prevent that either create a new
|
||||
/// [`WriteBufferStreamHandler`] or use [`seek`](Self::seek).
|
||||
///
|
||||
/// If the sequence number that the stream wants to read is unknown (either because it is in
|
||||
/// the future or because some retention policy removed it already), the stream will return an
|
||||
|
@ -252,8 +255,9 @@ pub trait WriteBufferReading: Sync + Send + Debug + 'static {
|
|||
|
||||
/// Get high watermark (= what we believe is the next sequence number to be added).
|
||||
///
|
||||
/// Can be used to calculate lag. Note that since the watermark is "next sequence ID number to be added", it starts
|
||||
/// at 0 and after the entry with sequence number 0 is added to the buffer, it is 1.
|
||||
/// Can be used to calculate lag. Note that since the watermark is "next sequence ID number to
|
||||
/// be added", it starts at 0 and after the entry with sequence number 0 is added to the
|
||||
/// buffer, it is 1.
|
||||
async fn fetch_high_watermark(&self, sequencer_id: u32) -> Result<u64, WriteBufferError>;
|
||||
|
||||
/// Return type (like `"mock"` or `"kafka"`) of this reader.
|
||||
|
@ -294,8 +298,8 @@ pub mod test_utils {
|
|||
|
||||
/// Create a new context.
|
||||
///
|
||||
/// This will be called multiple times during the test suite. Each resulting context must represent an isolated
|
||||
/// environment.
|
||||
/// This will be called multiple times during the test suite. Each resulting context must
|
||||
/// represent an isolated environment.
|
||||
async fn new_context(&self, n_sequencers: NonZeroU32) -> Self::Context {
|
||||
self.new_context_with_time(n_sequencers, Arc::new(iox_time::SystemProvider::new()))
|
||||
.await
|
||||
|
@ -310,7 +314,8 @@ pub mod test_utils {
|
|||
|
||||
/// Context used during testing.
|
||||
///
|
||||
/// Represents an isolated environment. Actions like sequencer creations and writes must not leak across context boundaries.
|
||||
/// Represents an isolated environment. Actions like sequencer creations and writes must not
|
||||
/// leak across context boundaries.
|
||||
#[async_trait]
|
||||
pub trait TestContext: Send + Sync {
|
||||
/// Write buffer writer implementation specific to this context and adapter.
|
||||
|
@ -331,10 +336,11 @@ pub mod test_utils {
|
|||
|
||||
/// Generic test suite that must be passed by all proper write buffer implementations.
|
||||
///
|
||||
/// See [`TestAdapter`] for how to make a concrete write buffer implementation work with this test suite.
|
||||
/// See [`TestAdapter`] for how to make a concrete write buffer implementation work with this
|
||||
/// test suite.
|
||||
///
|
||||
/// Note that you might need more tests on top of this to assert specific implementation behaviors, edge cases, and
|
||||
/// error handling.
|
||||
/// Note that you might need more tests on top of this to assert specific implementation
|
||||
/// behaviors, edge cases, and error handling.
|
||||
pub async fn perform_generic_tests<T>(adapter: T)
|
||||
where
|
||||
T: TestAdapter,
|
||||
|
@ -387,6 +393,7 @@ pub mod test_utils {
|
|||
/// Test IO with a single writer and single reader stream.
|
||||
///
|
||||
/// This tests that:
|
||||
///
|
||||
/// - streams process data in order
|
||||
/// - readers can handle the "pending" state w/o erroring
|
||||
/// - readers unblock after being in "pending" state
|
||||
|
@ -431,6 +438,7 @@ pub mod test_utils {
|
|||
/// Tests multiple subsequently created streams from a single [`WriteBufferStreamHandler`].
|
||||
///
|
||||
/// This tests that:
|
||||
///
|
||||
/// - readers remember their sequence number (and "pending" state) even when streams are dropped
|
||||
/// - state is not shared between handlers
|
||||
async fn test_multi_stream_io<T>(adapter: &T)
|
||||
|
@ -458,8 +466,8 @@ pub mod test_utils {
|
|||
let mut stream = stream_handler.stream().await;
|
||||
assert_write_op_eq(&stream.next().await.unwrap().unwrap(), &w1);
|
||||
|
||||
// re-creating stream after reading remembers sequence number, but wait a bit to provoke the stream to buffer
|
||||
// some entries
|
||||
// re-creating stream after reading remembers sequence number, but wait a bit to provoke
|
||||
// the stream to buffer some entries
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
drop(stream);
|
||||
let mut stream = stream_handler.stream().await;
|
||||
|
@ -481,7 +489,9 @@ pub mod test_utils {
|
|||
/// Test single reader-writer IO w/ multiple sequencers.
|
||||
///
|
||||
/// This tests that:
|
||||
/// - writes go to and reads come from the right sequencer, aka that sequencers provide a namespace-like isolation
|
||||
///
|
||||
/// - writes go to and reads come from the right sequencer, aka that sequencers provide a
|
||||
/// namespace-like isolation
|
||||
/// - "pending" states are specific to a sequencer
|
||||
async fn test_multi_sequencer_io<T>(adapter: &T)
|
||||
where
|
||||
|
@ -533,6 +543,7 @@ pub mod test_utils {
|
|||
/// Test multiple multiple writers and multiple readers on multiple sequencers.
|
||||
///
|
||||
/// This tests that:
|
||||
///
|
||||
/// - writers retrieve consistent sequencer IDs
|
||||
/// - writes go to and reads come from the right sequencer, similar
|
||||
/// to [`test_multi_sequencer_io`] but less detailed
|
||||
|
@ -577,6 +588,7 @@ pub mod test_utils {
|
|||
/// Test seek implemention of readers.
|
||||
///
|
||||
/// This tests that:
|
||||
///
|
||||
/// - seeking is specific to the reader AND sequencer
|
||||
/// - forward and backwards seeking works
|
||||
/// - seeking past the end of the known content works (results in "pending" status and
|
||||
|
@ -630,7 +642,8 @@ pub mod test_utils {
|
|||
assert_reader_content(&mut handler_1_1_a, &[&w_east_1, &w_east_2]).await;
|
||||
|
||||
// seek to far end and then add data
|
||||
// The affected stream should error and then stop. The other streams should still be pending.
|
||||
// The affected stream should error and then stop. The other streams should still be
|
||||
// pending.
|
||||
handler_1_1_a.seek(1_000_000).await.unwrap();
|
||||
let w_east_3 = write("namespace", &writer, entry_east_3, 0, None).await;
|
||||
|
||||
|
@ -658,6 +671,7 @@ pub mod test_utils {
|
|||
/// Test watermark fetching.
|
||||
///
|
||||
/// This tests that:
|
||||
///
|
||||
/// - watermarks for empty sequencers is 0
|
||||
/// - watermarks for non-empty sequencers is "last sequence ID plus 1"
|
||||
async fn test_watermark<T>(adapter: &T)
|
||||
|
@ -743,6 +757,7 @@ pub mod test_utils {
|
|||
/// Test that sequencer auto-creation works.
|
||||
///
|
||||
/// This tests that:
|
||||
///
|
||||
/// - both writer and reader cannot be constructed when sequencers are missing
|
||||
/// - both writer and reader can be auto-create sequencers
|
||||
async fn test_sequencer_auto_creation<T>(adapter: &T)
|
||||
|
@ -770,6 +785,7 @@ pub mod test_utils {
|
|||
/// Test sequencer IDs reporting of readers and writers.
|
||||
///
|
||||
/// This tests that:
|
||||
///
|
||||
/// - all sequencers are reported
|
||||
async fn test_sequencer_ids<T>(adapter: &T)
|
||||
where
|
||||
|
@ -885,6 +901,7 @@ pub mod test_utils {
|
|||
/// Test usage w/ multiple namespaces.
|
||||
///
|
||||
/// Tests that:
|
||||
///
|
||||
/// - namespace names or propagated correctly from writer to reader
|
||||
/// - all namespaces end up in a single stream
|
||||
async fn test_multi_namespaces<T>(adapter: &T)
|
||||
|
@ -949,14 +966,16 @@ pub mod test_utils {
|
|||
|
||||
/// Assert that the content of the reader is as expected.
|
||||
///
|
||||
/// This will read `expected_writes.len()` from the reader and then ensures that the stream is pending.
|
||||
/// This will read `expected_writes.len()` from the reader and then ensures that the stream is
|
||||
/// pending.
|
||||
async fn assert_reader_content(
|
||||
actual_stream_handler: &mut Box<dyn WriteBufferStreamHandler>,
|
||||
expected_writes: &[&DmlWrite],
|
||||
) {
|
||||
let actual_stream = actual_stream_handler.stream().await;
|
||||
|
||||
// we need to limit the stream to `expected_writes.len()` elements, otherwise it might be pending forever
|
||||
// we need to limit the stream to `expected_writes.len()` elements, otherwise it might be
|
||||
// pending forever
|
||||
let actual_writes: Vec<_> = actual_stream
|
||||
.take(expected_writes.len())
|
||||
.try_collect()
|
||||
|
@ -976,6 +995,7 @@ pub mod test_utils {
|
|||
/// Asserts that given span context are the same or that `second` links back to `first`.
|
||||
///
|
||||
/// "Same" means:
|
||||
///
|
||||
/// - identical trace ID
|
||||
/// - identical span ID
|
||||
/// - identical parent span ID
|
||||
|
@ -1004,7 +1024,8 @@ pub mod test_utils {
|
|||
assert_eq!(first.parent_span_id, second.parent_span_id);
|
||||
}
|
||||
|
||||
/// Assert that all span relations (parents, links) are found within the set of spans or within the set of roots.
|
||||
/// Assert that all span relations (parents, links) are found within the set of spans or within
|
||||
/// the set of roots.
|
||||
fn assert_span_relations_closed(spans: &[Span], roots: &[SpanContext]) {
|
||||
let all_ids: HashSet<_> = spans
|
||||
.iter()
|
||||
|
@ -1024,7 +1045,8 @@ pub mod test_utils {
|
|||
|
||||
/// Assert that given stream is pending.
|
||||
///
|
||||
/// This will will try to poll the stream for a bit to ensure that async IO has a chance to catch up.
|
||||
/// This will will try to poll the stream for a bit to ensure that async IO has a chance to
|
||||
/// catch up.
|
||||
async fn assert_stream_pending<S>(stream: &mut S)
|
||||
where
|
||||
S: Stream + Send + Unpin,
|
||||
|
@ -1084,8 +1106,8 @@ pub mod test_utils {
|
|||
}
|
||||
(false, None) => {
|
||||
eprintln!(
|
||||
"skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT to \
|
||||
run"
|
||||
"skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT \
|
||||
to run"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -124,6 +124,7 @@ impl MockBufferSharedState {
|
|||
/// Push a new entry to the specified sequencer.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// - when write is not sequenced
|
||||
/// - when no sequencer was initialized
|
||||
/// - when specified sequencer does not exist
|
||||
|
@ -135,6 +136,7 @@ impl MockBufferSharedState {
|
|||
/// Push a new operation to the specified sequencer
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// - when operation is not sequenced
|
||||
/// - when no sequencer was initialized
|
||||
/// - when specified sequencer does not exist
|
||||
|
|
Loading…
Reference in New Issue