chore(server): add logs for dropped WAL segments (#478)
* chore(server): add logs for dropped WAL segments Added logging for dropped writes and old segments in rollover scenarios Also including a dep on tracing and dev-dep on test_helpers Refs: #466 * chore(server): Add more context to logs Minor cleanup around remove_oldest_segment usage Suggestions from @alamb's reviewpull/24376/head
parent
cdb26e60e4
commit
7e2df1fc59
|
@ -435,6 +435,7 @@ dependencies = [
|
|||
"serde_json",
|
||||
"snafu",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"write_buffer",
|
||||
]
|
||||
|
||||
|
|
|
@ -17,8 +17,9 @@ influxdb_line_protocol = { path = "../influxdb_line_protocol" }
|
|||
query = { path = "../query" }
|
||||
write_buffer = { path = "../write_buffer" }
|
||||
object_store = { path = "../object_store" }
|
||||
tracing = "0.1"
|
||||
tokio = { version = "0.2", features = ["full"] }
|
||||
arrow_deps = { path = "../arrow_deps" }
|
||||
futures = "0.3.7"
|
||||
bytes = "0.5"
|
||||
chrono = "0.4"
|
||||
chrono = "0.4"
|
|
@ -11,6 +11,8 @@ use chrono::{DateTime, Utc};
|
|||
use snafu::Snafu;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use tracing::warn;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Max size limit hit {}", size))]
|
||||
|
@ -84,8 +86,20 @@ impl Buffer {
|
|||
}
|
||||
|
||||
match self.rollover_behavior {
|
||||
WalBufferRollover::DropIncoming => return Ok(None),
|
||||
WalBufferRollover::DropOldSegment => self.remove_oldest_segment(),
|
||||
WalBufferRollover::DropIncoming => {
|
||||
warn!(
|
||||
"WAL is full, dropping incoming write for current segment (segment id: {:?})",
|
||||
self.open_segment.id,
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
WalBufferRollover::DropOldSegment => {
|
||||
let oldest_segment_id = self.remove_oldest_segment();
|
||||
warn!(
|
||||
"WAL is full, dropping oldest segment (segment id: {:?})",
|
||||
oldest_segment_id
|
||||
);
|
||||
}
|
||||
WalBufferRollover::ReturnError => {
|
||||
return UnableToDropSegment {
|
||||
size: self.current_size,
|
||||
|
@ -185,10 +199,12 @@ impl Buffer {
|
|||
writes
|
||||
}
|
||||
|
||||
// Removes the oldest segment present in the buffer, returning its id
|
||||
#[allow(dead_code)]
|
||||
fn remove_oldest_segment(&mut self) {
|
||||
fn remove_oldest_segment(&mut self) -> u64 {
|
||||
let removed_segment = self.closed_segments.remove(0);
|
||||
self.current_size -= removed_segment.size;
|
||||
removed_segment.id
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue