diff --git a/write_buffer/src/file.rs b/write_buffer/src/file.rs index 13ed16d849..630e2749e8 100644 --- a/write_buffer/src/file.rs +++ b/write_buffer/src/file.rs @@ -9,38 +9,38 @@ //! //! ```text //! // -//! /cur | Location of current state +//! /active | Location of current state //! : | //! : | //! : | symlink //! : | -//! : +----+ -//! : | -//! : | -//! : | -//! /pad/ V -//! // -//! : /0/ -//! : : /cur/0 \ -//! : : : /1 | Message files -//! : : : /2 | (finished) -//! : : : ... / -//! : : : -//! : : : -//! : : /pad/ \ -//! : : / | Message files -//! : : / | (to be committed) -//! : : ... / -//! : : -//! : : -//! : /1/... \ -//! : /2/... | More sequencers -//! : ... / -//! : -//! : -//! // \ -//! // | Incomplete initialization attempts -//! ... / +//! : +--------+ +//! : | +//! : | +//! : | +//! /version/ V +//! // +//! : /0/ +//! : : /committed/0 \ +//! : : : /1 | Message files +//! : : : /2 | (finished) +//! : : : ... / +//! : : : +//! : : : +//! : : /temp/ \ +//! : : / | Message files +//! : : / | (to be committed) +//! : : ... / +//! : : +//! : : +//! : /1/... \ +//! : /2/... | More sequencers +//! : ... / +//! : +//! : +//! // \ +//! // | Incomplete initialization attempts +//! ... / //! ``` //! //! Every message file then uses an HTTP-inspired format: @@ -224,13 +224,13 @@ impl WriteBufferWriting for FileBufferProducer { message.extend(b"\n"); message.extend(entry.data()); - // write data to scratchpad file - let pad_file = sequencer_path.join("pad").join(Uuid::new_v4().to_string()); - tokio::fs::write(&pad_file, &message).await?; + // write data to scratchpad file in temp directory + let temp_file = sequencer_path.join("temp").join(Uuid::new_v4().to_string()); + tokio::fs::write(&temp_file, &message).await?; // scan existing files to figure out new sequence number - let cur = sequencer_path.join("cur"); - let existing_files = scan_dir::(&cur, FileType::File).await?; + let committed = sequencer_path.join("committed"); + let existing_files = scan_dir::(&committed, FileType::File).await?; let mut sequence_number = if let Some(max) = existing_files.keys().max() { max.checked_add(1).ok_or_else::(|| { "Overflow during sequence number calculation" @@ -243,8 +243,11 @@ impl WriteBufferWriting for FileBufferProducer { // try to link scratchpad file to "current" dir loop { - let cur_file = cur.join(sequence_number.to_string()); - if tokio::fs::hard_link(&pad_file, &cur_file).await.is_ok() { + let committed_file = committed.join(sequence_number.to_string()); + if tokio::fs::hard_link(&temp_file, &committed_file) + .await + .is_ok() + { break; } sequence_number = sequence_number @@ -257,7 +260,7 @@ impl WriteBufferWriting for FileBufferProducer { } // unlink scratchpad file (and ignore error) - tokio::fs::remove_file(&pad_file).await.ok(); + tokio::fs::remove_file(&temp_file).await.ok(); Ok((Sequence::new(sequencer_id, sequence_number), now)) } @@ -302,21 +305,21 @@ impl WriteBufferReading for FileBufferConsumer { let mut streams = BTreeMap::default(); for (sequencer_id, (sequencer_path, next_sequence_number)) in &self.dirs { - let cur = sequencer_path.join("cur"); + let committed = sequencer_path.join("committed"); let stream = ConsumerStream::new( *sequencer_id, - cur.clone(), + committed.clone(), Arc::clone(next_sequence_number), self.trace_collector.clone(), ) .boxed(); let fetch_high_watermark = move || { - let cur = cur.clone(); + let committed = committed.clone(); let fut = async move { - let files = scan_dir::(&cur, FileType::File).await?; + let files = scan_dir::(&committed, FileType::File).await?; let watermark = files.keys().max().map(|n| n + 1).unwrap_or(0); Ok(watermark) @@ -525,48 +528,48 @@ async fn maybe_auto_create_directories( creation_config: Option<&WriteBufferCreationConfig>, ) -> Result, WriteBufferError> { loop { - // figure out if a current version exists - let cur = root.join("cur"); - if tokio::fs::metadata(&cur).await.is_ok() { + // figure out if a active version exists + let active = root.join("active"); + if tokio::fs::metadata(&active).await.is_ok() { // Scan for directories - let directories = scan_dir(&cur, FileType::Dir).await?; + let directories = scan_dir(&active, FileType::Dir).await?; if directories.is_empty() { - return Err("Current configuration has zero sequencers." + return Err("Active configuration has zero sequencers." .to_string() .into()); } return Ok(directories); } - // no current config exists + // no active config exists if let Some(creation_config) = creation_config { - // create scratchpad directory - let pad = root.join("pad").join(Uuid::new_v4().to_string()); - tokio::fs::create_dir_all(&pad).await?; + // create version directory + let version = root.join("version").join(Uuid::new_v4().to_string()); + tokio::fs::create_dir_all(&version).await?; let mut directories = BTreeMap::new(); for sequencer_id in 0..creation_config.n_sequencers.get() { - let sequencer_path_in_pad = pad.join(sequencer_id.to_string()); - tokio::fs::create_dir(&sequencer_path_in_pad).await?; + let sequencer_path_in_version = version.join(sequencer_id.to_string()); + tokio::fs::create_dir(&sequencer_path_in_version).await?; - let sequencer_path_in_pad_cur = sequencer_path_in_pad.join("cur"); - tokio::fs::create_dir(&sequencer_path_in_pad_cur).await?; + let committed = sequencer_path_in_version.join("committed"); + tokio::fs::create_dir(&committed).await?; - let sequencer_path_in_pad_pad = sequencer_path_in_pad.join("pad"); - tokio::fs::create_dir(&sequencer_path_in_pad_pad).await?; + let temp = sequencer_path_in_version.join("temp"); + tokio::fs::create_dir(&temp).await?; - let sequencer_path_in_cur = cur.join(sequencer_id.to_string()); - directories.insert(sequencer_id, sequencer_path_in_cur); + let sequencer_path_in_active = active.join(sequencer_id.to_string()); + directories.insert(sequencer_id, sequencer_path_in_active); } - // symlink cur->pad - if tokio::fs::symlink(&pad, cur).await.is_ok() { + // symlink active->version + if tokio::fs::symlink(&version, active).await.is_ok() { // linking worked return Ok(directories); } else { - // linking did not work, assuming a concurrent initialization process. Remove scratchpad and and try again - tokio::fs::remove_dir_all(&pad).await?; + // linking did not work, assuming a concurrent initialization process. Remove version and and try again. + tokio::fs::remove_dir_all(&version).await?; continue; } } else {