From ab72c93a5e7130fa466ccdceebbe4f6f5ef02791 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 19 May 2022 11:29:53 -0400 Subject: [PATCH] docs: Updating wrapping, content, and grammar of comments --- clap_blocks/src/ingester.rs | 20 +++---- ingester/src/stream_handler/handler.rs | 18 +++--- write_buffer/src/core.rs | 78 +++++++++++++++++--------- write_buffer/src/mock.rs | 2 + 4 files changed, 69 insertions(+), 49 deletions(-) diff --git a/clap_blocks/src/ingester.rs b/clap_blocks/src/ingester.rs index 1132cb14dd..40df44e6c1 100644 --- a/clap_blocks/src/ingester.rs +++ b/clap_blocks/src/ingester.rs @@ -15,19 +15,18 @@ pub struct IngesterConfig { )] pub write_buffer_partition_range_end: i32, - /// The ingester will continue to pull data and buffer it from Kafka - /// as long as it is below this size. If it hits this size it will pause - /// ingest from Kafka until persistence goes below this threshold. + /// The ingester will continue to pull data and buffer it from the write buffer as long as the + /// ingester buffer is below this size. If the ingester buffer hits this size, ingest from the + /// write buffer will pause until the ingester buffer goes below this threshold. #[clap( long = "--pause-ingest-size-bytes", env = "INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES" )] pub pause_ingest_size_bytes: usize, - /// Once the ingester crosses this threshold of data buffered across - /// all sequencers, it will pick the largest partitions and persist - /// them until it falls below this threshold. An ingester running in - /// a steady state is expected to take up this much memory. + /// Once the ingester crosses this threshold of data buffered across all sequencers, it will + /// pick the largest partitions and persist them until it falls below this threshold. An + /// ingester running in a steady state is expected to take up this much memory. #[clap( long = "--persist-memory-threshold-bytes", env = "INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES" @@ -43,10 +42,9 @@ pub struct IngesterConfig { )] pub persist_partition_size_threshold_bytes: usize, - /// If a partition has had data buffered for longer than this period of time - /// it will be persisted. This puts an upper bound on how far back the - /// ingester may need to read in Kafka on restart or recovery. The default value - /// is 30 minutes (in seconds). + /// If a partition has had data buffered for longer than this period of time, it will be + /// persisted. This puts an upper bound on how far back the ingester may need to read from the + /// write buffer on restart or recovery. The default value is 30 minutes (in seconds). #[clap( long = "--persist-partition-age-threshold-seconds", env = "INFLUXDB_IOX_PERSIST_PARTITION_AGE_THRESHOLD_SECONDS", diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index 1786fb7bb0..ab3ab57c9f 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -164,15 +164,15 @@ where O: DmlSink, T: TimeProvider, { - /// Run the stream handler, consuming items from [`Stream`] and applying - /// them to the [`DmlSink`]. + /// Run the stream handler, consuming items from the stream provided by the + /// [`WriteBufferStreamHandler`] and applying them to the [`DmlSink`]. /// /// This method blocks until gracefully shutdown by cancelling the /// `shutdown` [`CancellationToken`]. Once cancelled, this handler will /// complete the current operation it is processing before this method /// returns. /// - /// # Panics + /// # Panics /// /// This method panics if the input stream ends (yields a `None`). pub async fn run(mut self, shutdown: CancellationToken) { @@ -183,8 +183,7 @@ where let mut reset_to_earliest_once = false; loop { - // Wait for a DML operation from the sequencer, or a graceful stop - // signal. + // Wait for a DML operation from the sequencer, or a graceful stop signal. let maybe_op = futures::select!( next = stream.next().fuse() => next, _ = shutdown_fut => { @@ -493,12 +492,12 @@ mod tests { macro_rules! test_stream_handler { ( $name:ident, - stream_ops = $stream_ops:expr, // An ordered set of stream items to feed to the handler - sink_rets = $sink_ret:expr, // An ordered set of values to return from the mock op sink + stream_ops = $stream_ops:expr, // Ordered set of stream items to feed to the handler + sink_rets = $sink_ret:expr, // Ordered set of values to return from the mock op sink want_ttbr = $want_ttbr:literal, // Desired TTBR value in milliseconds // Optional set of ingest error metric label / values to assert want_err_metrics = [$($metric_name:literal => $metric_count:literal),*], - want_sink = $($want_sink:tt)+ // A pattern to match against the calls made to the op sink + want_sink = $($want_sink:tt)+ // Pattern to match against calls made to the op sink ) => { paste::paste! { #[tokio::test] @@ -797,8 +796,7 @@ mod tests { } } - // An abnormal end to the steam causes a panic, rather than a silent stream - // reader exit. + // An abnormal end to the steam causes a panic, rather than a silent stream reader exit. #[tokio::test] #[should_panic( expected = "sequencer KafkaPartition(42) stream for topic kafka_topic_name ended without \ diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index 16e7746d89..a9cf3d5278 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -136,10 +136,11 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static { /// /// The [`dml::DmlMeta`] will be propagated where applicable /// - /// This call may "async block" (i.e. be in a pending state) to accumulate multiple operations into a single batch. - /// After this method returns the operation was actually written (i.e. it is NOT buffered any longer). You may use - /// [`flush`](Self::flush) to trigger an early submission (e.g. before some linger time expired), which can be - /// helpful for controlled shutdown. + /// This call may "async block" (i.e. be in a pending state) to accumulate multiple operations + /// into a single batch. After this method returns the operation was actually written (i.e. it + /// is NOT buffered any longer). You may use [`flush`](Self::flush) to trigger an early + /// submission (e.g. before some linger time expired), which can be helpful for controlled + /// shutdown. /// /// Returns the metadata that was written. async fn store_operation( @@ -168,8 +169,9 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static { /// Flush all currently blocking store operations ([`store_operation`](Self::store_operation) / /// [`store_lp`](Self::store_lp)). /// - /// This call is pending while outstanding data is being submitted and will return AFTER the flush completed. - /// However you still need to poll the store operations to get the metadata for every write. + /// This call is pending while outstanding data is being submitted and will return AFTER the + /// flush completed. However you still need to poll the store operations to get the metadata + /// for every write. async fn flush(&self) -> Result<(), WriteBufferError>; /// Return type (like `"mock"` or `"kafka"`) of this writer. @@ -183,10 +185,11 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static { pub trait WriteBufferStreamHandler: Sync + Send + Debug + 'static { /// Stream that produces DML operations. /// - /// Note that due to the mutable borrow, it is not possible to have multiple streams from the same - /// [`WriteBufferStreamHandler`] instance at the same time. If all streams are dropped and requested again, the last - /// sequence number of the old streams will be the start sequence number for the new streams. If you want to - /// prevent that either create a new [`WriteBufferStreamHandler`] or use [`seek`](Self::seek). + /// Note that due to the mutable borrow, it is not possible to have multiple streams from the + /// same [`WriteBufferStreamHandler`] instance at the same time. If all streams are dropped and + /// requested again, the last sequence number of the old streams will be the start sequence + /// number for the new streams. If you want to prevent that either create a new + /// [`WriteBufferStreamHandler`] or use [`seek`](Self::seek). /// /// If the sequence number that the stream wants to read is unknown (either because it is in /// the future or because some retention policy removed it already), the stream will return an @@ -252,8 +255,9 @@ pub trait WriteBufferReading: Sync + Send + Debug + 'static { /// Get high watermark (= what we believe is the next sequence number to be added). /// - /// Can be used to calculate lag. Note that since the watermark is "next sequence ID number to be added", it starts - /// at 0 and after the entry with sequence number 0 is added to the buffer, it is 1. + /// Can be used to calculate lag. Note that since the watermark is "next sequence ID number to + /// be added", it starts at 0 and after the entry with sequence number 0 is added to the + /// buffer, it is 1. async fn fetch_high_watermark(&self, sequencer_id: u32) -> Result; /// Return type (like `"mock"` or `"kafka"`) of this reader. @@ -294,8 +298,8 @@ pub mod test_utils { /// Create a new context. /// - /// This will be called multiple times during the test suite. Each resulting context must represent an isolated - /// environment. + /// This will be called multiple times during the test suite. Each resulting context must + /// represent an isolated environment. async fn new_context(&self, n_sequencers: NonZeroU32) -> Self::Context { self.new_context_with_time(n_sequencers, Arc::new(iox_time::SystemProvider::new())) .await @@ -310,7 +314,8 @@ pub mod test_utils { /// Context used during testing. /// - /// Represents an isolated environment. Actions like sequencer creations and writes must not leak across context boundaries. + /// Represents an isolated environment. Actions like sequencer creations and writes must not + /// leak across context boundaries. #[async_trait] pub trait TestContext: Send + Sync { /// Write buffer writer implementation specific to this context and adapter. @@ -331,10 +336,11 @@ pub mod test_utils { /// Generic test suite that must be passed by all proper write buffer implementations. /// - /// See [`TestAdapter`] for how to make a concrete write buffer implementation work with this test suite. + /// See [`TestAdapter`] for how to make a concrete write buffer implementation work with this + /// test suite. /// - /// Note that you might need more tests on top of this to assert specific implementation behaviors, edge cases, and - /// error handling. + /// Note that you might need more tests on top of this to assert specific implementation + /// behaviors, edge cases, and error handling. pub async fn perform_generic_tests(adapter: T) where T: TestAdapter, @@ -387,6 +393,7 @@ pub mod test_utils { /// Test IO with a single writer and single reader stream. /// /// This tests that: + /// /// - streams process data in order /// - readers can handle the "pending" state w/o erroring /// - readers unblock after being in "pending" state @@ -431,6 +438,7 @@ pub mod test_utils { /// Tests multiple subsequently created streams from a single [`WriteBufferStreamHandler`]. /// /// This tests that: + /// /// - readers remember their sequence number (and "pending" state) even when streams are dropped /// - state is not shared between handlers async fn test_multi_stream_io(adapter: &T) @@ -458,8 +466,8 @@ pub mod test_utils { let mut stream = stream_handler.stream().await; assert_write_op_eq(&stream.next().await.unwrap().unwrap(), &w1); - // re-creating stream after reading remembers sequence number, but wait a bit to provoke the stream to buffer - // some entries + // re-creating stream after reading remembers sequence number, but wait a bit to provoke + // the stream to buffer some entries tokio::time::sleep(Duration::from_millis(10)).await; drop(stream); let mut stream = stream_handler.stream().await; @@ -481,7 +489,9 @@ pub mod test_utils { /// Test single reader-writer IO w/ multiple sequencers. /// /// This tests that: - /// - writes go to and reads come from the right sequencer, aka that sequencers provide a namespace-like isolation + /// + /// - writes go to and reads come from the right sequencer, aka that sequencers provide a + /// namespace-like isolation /// - "pending" states are specific to a sequencer async fn test_multi_sequencer_io(adapter: &T) where @@ -533,6 +543,7 @@ pub mod test_utils { /// Test multiple multiple writers and multiple readers on multiple sequencers. /// /// This tests that: + /// /// - writers retrieve consistent sequencer IDs /// - writes go to and reads come from the right sequencer, similar /// to [`test_multi_sequencer_io`] but less detailed @@ -577,6 +588,7 @@ pub mod test_utils { /// Test seek implemention of readers. /// /// This tests that: + /// /// - seeking is specific to the reader AND sequencer /// - forward and backwards seeking works /// - seeking past the end of the known content works (results in "pending" status and @@ -630,7 +642,8 @@ pub mod test_utils { assert_reader_content(&mut handler_1_1_a, &[&w_east_1, &w_east_2]).await; // seek to far end and then add data - // The affected stream should error and then stop. The other streams should still be pending. + // The affected stream should error and then stop. The other streams should still be + // pending. handler_1_1_a.seek(1_000_000).await.unwrap(); let w_east_3 = write("namespace", &writer, entry_east_3, 0, None).await; @@ -658,6 +671,7 @@ pub mod test_utils { /// Test watermark fetching. /// /// This tests that: + /// /// - watermarks for empty sequencers is 0 /// - watermarks for non-empty sequencers is "last sequence ID plus 1" async fn test_watermark(adapter: &T) @@ -743,6 +757,7 @@ pub mod test_utils { /// Test that sequencer auto-creation works. /// /// This tests that: + /// /// - both writer and reader cannot be constructed when sequencers are missing /// - both writer and reader can be auto-create sequencers async fn test_sequencer_auto_creation(adapter: &T) @@ -770,6 +785,7 @@ pub mod test_utils { /// Test sequencer IDs reporting of readers and writers. /// /// This tests that: + /// /// - all sequencers are reported async fn test_sequencer_ids(adapter: &T) where @@ -885,6 +901,7 @@ pub mod test_utils { /// Test usage w/ multiple namespaces. /// /// Tests that: + /// /// - namespace names or propagated correctly from writer to reader /// - all namespaces end up in a single stream async fn test_multi_namespaces(adapter: &T) @@ -949,14 +966,16 @@ pub mod test_utils { /// Assert that the content of the reader is as expected. /// - /// This will read `expected_writes.len()` from the reader and then ensures that the stream is pending. + /// This will read `expected_writes.len()` from the reader and then ensures that the stream is + /// pending. async fn assert_reader_content( actual_stream_handler: &mut Box, expected_writes: &[&DmlWrite], ) { let actual_stream = actual_stream_handler.stream().await; - // we need to limit the stream to `expected_writes.len()` elements, otherwise it might be pending forever + // we need to limit the stream to `expected_writes.len()` elements, otherwise it might be + // pending forever let actual_writes: Vec<_> = actual_stream .take(expected_writes.len()) .try_collect() @@ -976,6 +995,7 @@ pub mod test_utils { /// Asserts that given span context are the same or that `second` links back to `first`. /// /// "Same" means: + /// /// - identical trace ID /// - identical span ID /// - identical parent span ID @@ -1004,7 +1024,8 @@ pub mod test_utils { assert_eq!(first.parent_span_id, second.parent_span_id); } - /// Assert that all span relations (parents, links) are found within the set of spans or within the set of roots. + /// Assert that all span relations (parents, links) are found within the set of spans or within + /// the set of roots. fn assert_span_relations_closed(spans: &[Span], roots: &[SpanContext]) { let all_ids: HashSet<_> = spans .iter() @@ -1024,7 +1045,8 @@ pub mod test_utils { /// Assert that given stream is pending. /// - /// This will will try to poll the stream for a bit to ensure that async IO has a chance to catch up. + /// This will will try to poll the stream for a bit to ensure that async IO has a chance to + /// catch up. async fn assert_stream_pending(stream: &mut S) where S: Stream + Send + Unpin, @@ -1084,8 +1106,8 @@ pub mod test_utils { } (false, None) => { eprintln!( - "skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT to \ - run" + "skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT \ + to run" ); return; } diff --git a/write_buffer/src/mock.rs b/write_buffer/src/mock.rs index af5e8d7bb7..d595670657 100644 --- a/write_buffer/src/mock.rs +++ b/write_buffer/src/mock.rs @@ -124,6 +124,7 @@ impl MockBufferSharedState { /// Push a new entry to the specified sequencer. /// /// # Panics + /// /// - when write is not sequenced /// - when no sequencer was initialized /// - when specified sequencer does not exist @@ -135,6 +136,7 @@ impl MockBufferSharedState { /// Push a new operation to the specified sequencer /// /// # Panics + /// /// - when operation is not sequenced /// - when no sequencer was initialized /// - when specified sequencer does not exist