WIP
parent
9bf18a4bf0
commit
131ac0fff6
|
@ -237,6 +237,10 @@ impl WalObjectStore {
|
||||||
)> {
|
)> {
|
||||||
let (wal_contents, responses, snapshot) = {
|
let (wal_contents, responses, snapshot) = {
|
||||||
let mut flush_buffer = self.flush_buffer.lock().await;
|
let mut flush_buffer = self.flush_buffer.lock().await;
|
||||||
|
if flush_buffer.wal_buffer.is_empty() && !force_snapshot {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
flush_buffer
|
flush_buffer
|
||||||
.flush_buffer_into_contents_and_responses(force_snapshot)
|
.flush_buffer_into_contents_and_responses(force_snapshot)
|
||||||
.await?
|
.await?
|
||||||
|
@ -482,55 +486,25 @@ impl FlushBuffer {
|
||||||
Vec<oneshot::Sender<WriteResult>>,
|
Vec<oneshot::Sender<WriteResult>>,
|
||||||
Option<(SnapshotDetails, OwnedSemaphorePermit)>,
|
Option<(SnapshotDetails, OwnedSemaphorePermit)>,
|
||||||
)> {
|
)> {
|
||||||
// whether wal buffer is empty or not, we can force snapshot. But when it's empty
|
if force_snapshot && self.wal_buffer.is_empty() {
|
||||||
// we need to add some WalOp to capture the snapshot details in wal file and
|
|
||||||
// currently WalOp::Noop is used
|
|
||||||
let forced_snapshot_details = if self.wal_buffer.is_empty() && force_snapshot {
|
|
||||||
debug!(">>> adding a no op with force snapshot details");
|
|
||||||
self.wal_buffer.add_no_op();
|
self.wal_buffer.add_no_op();
|
||||||
self.snapshot_tracker.snapshot(force_snapshot)
|
}
|
||||||
} else if force_snapshot {
|
|
||||||
// wal buffer isn't empty so just run a force snapshot
|
|
||||||
debug!(">>> adding force snapshot details");
|
|
||||||
self.snapshot_tracker.snapshot(force_snapshot)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
// convert into wal contents and responses and capture if a snapshot should be taken
|
// convert into wal contents and responses and capture if a snapshot should be taken
|
||||||
if !self.wal_buffer.is_empty() {
|
|
||||||
let (mut wal_contents, responses) = self.flush_buffer_with_responses(force_snapshot);
|
let (mut wal_contents, responses) = self.flush_buffer_with_responses(force_snapshot);
|
||||||
|
|
||||||
// NB: Add wal period only when it is not a forced snapshot. We'd have just emptied the
|
|
||||||
// snapshot tracker if it's a forced snapshot, the current wal content will also
|
|
||||||
// be added to parquet file when snapshotting so no need to add the wal period.
|
|
||||||
let forced_snapshot = forced_snapshot_details
|
|
||||||
.map(|details| details.forced)
|
|
||||||
.unwrap_or(false);
|
|
||||||
|
|
||||||
let snapshot = if !forced_snapshot {
|
|
||||||
// if it's not forced snapshot add the wal period and snapshot straight after and
|
|
||||||
// if it happens to hit the 3 * snapshot size condition it'll empty the wal period
|
|
||||||
// that is just added (look into SnapshotTracker::snapshot for 3 * snapshot size
|
|
||||||
// details)
|
|
||||||
self.snapshot_tracker.add_wal_period(WalPeriod {
|
self.snapshot_tracker.add_wal_period(WalPeriod {
|
||||||
wal_file_number: wal_contents.wal_file_number,
|
wal_file_number: wal_contents.wal_file_number,
|
||||||
min_time: Timestamp::new(wal_contents.min_timestamp_ns),
|
min_time: Timestamp::new(wal_contents.min_timestamp_ns),
|
||||||
max_time: Timestamp::new(wal_contents.max_timestamp_ns),
|
max_time: Timestamp::new(wal_contents.max_timestamp_ns),
|
||||||
});
|
});
|
||||||
let snapshot_details = self.snapshot_tracker.snapshot(force_snapshot);
|
let snapshot_details = self.snapshot_tracker.snapshot(force_snapshot);
|
||||||
self.set_snapshot_details_and_acquite_semaphore(snapshot_details, &mut wal_contents)
|
|
||||||
.await
|
let snapshot = self
|
||||||
} else {
|
.set_snapshot_details_and_acquite_semaphore(snapshot_details, &mut wal_contents)
|
||||||
self.set_snapshot_details_and_acquite_semaphore(
|
.await;
|
||||||
forced_snapshot_details,
|
|
||||||
&mut wal_contents,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
};
|
|
||||||
|
|
||||||
return Some((wal_contents, responses, snapshot));
|
return Some((wal_contents, responses, snapshot));
|
||||||
};
|
|
||||||
|
|
||||||
// at this point there's nothing to write to wal file, no wal content with or without
|
// at this point there's nothing to write to wal file, no wal content with or without
|
||||||
// snapshot details
|
// snapshot details
|
||||||
|
|
Loading…
Reference in New Issue