From d4c090cb846824c3b19c2e3a0737e27a05886dba Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 6 Apr 2021 22:38:04 +0100 Subject: [PATCH] refactor: extract OnceNonZeroU32 (#1134) --- internal_types/src/lib.rs | 1 + internal_types/src/once.rs | 44 ++++++++++++++++++++++++++++++++++++++ server/src/lib.rs | 23 +++++++++----------- 3 files changed, 55 insertions(+), 13 deletions(-) create mode 100644 internal_types/src/once.rs diff --git a/internal_types/src/lib.rs b/internal_types/src/lib.rs index ecaf7f6243..3488faf07c 100644 --- a/internal_types/src/lib.rs +++ b/internal_types/src/lib.rs @@ -7,5 +7,6 @@ )] pub mod data; +pub mod once; pub mod schema; pub mod selection; diff --git a/internal_types/src/once.rs b/internal_types/src/once.rs new file mode 100644 index 0000000000..d509b944ca --- /dev/null +++ b/internal_types/src/once.rs @@ -0,0 +1,44 @@ +use std::num::NonZeroU32; +use std::sync::atomic::{AtomicU32, Ordering}; + +/// A non-zero value that can be set once +#[derive(Debug, Default)] +pub struct OnceNonZeroU32(AtomicU32); + +impl OnceNonZeroU32 { + pub fn new() -> Self { + Self(AtomicU32::new(0)) + } + + pub fn get(&self) -> Option { + NonZeroU32::new(self.0.load(Ordering::Relaxed)) + } + + pub fn set(&self, value: NonZeroU32) -> Result<(), NonZeroU32> { + match self + .0 + .compare_exchange(0, value.get(), Ordering::Relaxed, Ordering::Relaxed) + { + Ok(_) => Ok(()), + Err(id) => Err(NonZeroU32::new(id).unwrap()), // Must be non-zero in order to fail + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_once_non_zero() { + let a = OnceNonZeroU32::default(); + + assert!(a.get().is_none()); + a.set(NonZeroU32::new(293).unwrap()).unwrap(); + assert_eq!(a.get().unwrap().get(), 293); + assert_eq!( + a.set(NonZeroU32::new(2334).unwrap()).unwrap_err().get(), + 293 + ); + } +} diff --git a/server/src/lib.rs b/server/src/lib.rs index 56b974fbc1..fd7514b42d 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -67,10 +67,7 @@ clippy::clone_on_ref_ptr )] -use std::sync::{ - atomic::{AtomicU32, Ordering}, - Arc, -}; +use std::sync::Arc; use async_trait::async_trait; use bytes::BytesMut; @@ -85,7 +82,10 @@ use data_types::{ {DatabaseName, DatabaseNameError}, }; use influxdb_line_protocol::ParsedLine; -use internal_types::data::{lines_to_replicated_write, ReplicatedWrite}; +use internal_types::{ + data::{lines_to_replicated_write, ReplicatedWrite}, + once::OnceNonZeroU32, +}; use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi}; use query::{exec::Executor, Database, DatabaseStore}; use tracker::task::{ @@ -134,7 +134,7 @@ pub enum Error { #[snafu(display("error replicating to remote: {}", source))] ErrorReplicating { source: DatabaseError }, #[snafu(display("id already set"))] - IdAlreadySet { id: u32 }, + IdAlreadySet { id: NonZeroU32 }, #[snafu(display("unable to use server until id is set"))] IdNotSet, #[snafu(display("error serializing configuration {}", source))] @@ -186,7 +186,7 @@ const STORE_ERROR_PAUSE_SECONDS: u64 = 100; /// of these structs, which keeps track of all replication and query rules. #[derive(Debug)] pub struct Server { - id: AtomicU32, + id: OnceNonZeroU32, config: Arc, connection_manager: Arc, pub store: Arc, @@ -199,7 +199,7 @@ impl Server { let jobs = Arc::new(JobRegistry::new()); Self { - id: AtomicU32::new(0), + id: Default::default(), config: Arc::new(Config::new(Arc::clone(&jobs))), store, connection_manager: Arc::new(connection_manager), @@ -213,15 +213,12 @@ impl Server { /// /// A valid server ID Must be non-zero. pub fn set_id(&self, id: NonZeroU32) -> Result<()> { - self.id - .compare_exchange(0, id.get(), Ordering::Relaxed, Ordering::Relaxed) - .map_err(|id| Error::IdAlreadySet { id })?; - Ok(()) + self.id.set(id).map_err(|id| Error::IdAlreadySet { id }) } /// Returns the current server ID, or an error if not yet set. pub fn require_id(&self) -> Result { - NonZeroU32::new(self.id.load(Ordering::Relaxed)).context(IdNotSet) + self.id.get().context(IdNotSet) } /// Tells the server the set of rules for a database.