refactor: extract OnceNonZeroU32 (#1134)
parent
864f9bcd35
commit
d4c090cb84
|
@ -7,5 +7,6 @@
|
||||||
)]
|
)]
|
||||||
|
|
||||||
pub mod data;
|
pub mod data;
|
||||||
|
pub mod once;
|
||||||
pub mod schema;
|
pub mod schema;
|
||||||
pub mod selection;
|
pub mod selection;
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
use std::num::NonZeroU32;
|
||||||
|
use std::sync::atomic::{AtomicU32, Ordering};
|
||||||
|
|
||||||
|
/// A non-zero value that can be set once
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct OnceNonZeroU32(AtomicU32);
|
||||||
|
|
||||||
|
impl OnceNonZeroU32 {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self(AtomicU32::new(0))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self) -> Option<NonZeroU32> {
|
||||||
|
NonZeroU32::new(self.0.load(Ordering::Relaxed))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set(&self, value: NonZeroU32) -> Result<(), NonZeroU32> {
|
||||||
|
match self
|
||||||
|
.0
|
||||||
|
.compare_exchange(0, value.get(), Ordering::Relaxed, Ordering::Relaxed)
|
||||||
|
{
|
||||||
|
Ok(_) => Ok(()),
|
||||||
|
Err(id) => Err(NonZeroU32::new(id).unwrap()), // Must be non-zero in order to fail
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_once_non_zero() {
|
||||||
|
let a = OnceNonZeroU32::default();
|
||||||
|
|
||||||
|
assert!(a.get().is_none());
|
||||||
|
a.set(NonZeroU32::new(293).unwrap()).unwrap();
|
||||||
|
assert_eq!(a.get().unwrap().get(), 293);
|
||||||
|
assert_eq!(
|
||||||
|
a.set(NonZeroU32::new(2334).unwrap()).unwrap_err().get(),
|
||||||
|
293
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -67,10 +67,7 @@
|
||||||
clippy::clone_on_ref_ptr
|
clippy::clone_on_ref_ptr
|
||||||
)]
|
)]
|
||||||
|
|
||||||
use std::sync::{
|
use std::sync::Arc;
|
||||||
atomic::{AtomicU32, Ordering},
|
|
||||||
Arc,
|
|
||||||
};
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
|
@ -85,7 +82,10 @@ use data_types::{
|
||||||
{DatabaseName, DatabaseNameError},
|
{DatabaseName, DatabaseNameError},
|
||||||
};
|
};
|
||||||
use influxdb_line_protocol::ParsedLine;
|
use influxdb_line_protocol::ParsedLine;
|
||||||
use internal_types::data::{lines_to_replicated_write, ReplicatedWrite};
|
use internal_types::{
|
||||||
|
data::{lines_to_replicated_write, ReplicatedWrite},
|
||||||
|
once::OnceNonZeroU32,
|
||||||
|
};
|
||||||
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
|
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
|
||||||
use query::{exec::Executor, Database, DatabaseStore};
|
use query::{exec::Executor, Database, DatabaseStore};
|
||||||
use tracker::task::{
|
use tracker::task::{
|
||||||
|
@ -134,7 +134,7 @@ pub enum Error {
|
||||||
#[snafu(display("error replicating to remote: {}", source))]
|
#[snafu(display("error replicating to remote: {}", source))]
|
||||||
ErrorReplicating { source: DatabaseError },
|
ErrorReplicating { source: DatabaseError },
|
||||||
#[snafu(display("id already set"))]
|
#[snafu(display("id already set"))]
|
||||||
IdAlreadySet { id: u32 },
|
IdAlreadySet { id: NonZeroU32 },
|
||||||
#[snafu(display("unable to use server until id is set"))]
|
#[snafu(display("unable to use server until id is set"))]
|
||||||
IdNotSet,
|
IdNotSet,
|
||||||
#[snafu(display("error serializing configuration {}", source))]
|
#[snafu(display("error serializing configuration {}", source))]
|
||||||
|
@ -186,7 +186,7 @@ const STORE_ERROR_PAUSE_SECONDS: u64 = 100;
|
||||||
/// of these structs, which keeps track of all replication and query rules.
|
/// of these structs, which keeps track of all replication and query rules.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Server<M: ConnectionManager> {
|
pub struct Server<M: ConnectionManager> {
|
||||||
id: AtomicU32,
|
id: OnceNonZeroU32,
|
||||||
config: Arc<Config>,
|
config: Arc<Config>,
|
||||||
connection_manager: Arc<M>,
|
connection_manager: Arc<M>,
|
||||||
pub store: Arc<ObjectStore>,
|
pub store: Arc<ObjectStore>,
|
||||||
|
@ -199,7 +199,7 @@ impl<M: ConnectionManager> Server<M> {
|
||||||
let jobs = Arc::new(JobRegistry::new());
|
let jobs = Arc::new(JobRegistry::new());
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
id: AtomicU32::new(0),
|
id: Default::default(),
|
||||||
config: Arc::new(Config::new(Arc::clone(&jobs))),
|
config: Arc::new(Config::new(Arc::clone(&jobs))),
|
||||||
store,
|
store,
|
||||||
connection_manager: Arc::new(connection_manager),
|
connection_manager: Arc::new(connection_manager),
|
||||||
|
@ -213,15 +213,12 @@ impl<M: ConnectionManager> Server<M> {
|
||||||
///
|
///
|
||||||
/// A valid server ID Must be non-zero.
|
/// A valid server ID Must be non-zero.
|
||||||
pub fn set_id(&self, id: NonZeroU32) -> Result<()> {
|
pub fn set_id(&self, id: NonZeroU32) -> Result<()> {
|
||||||
self.id
|
self.id.set(id).map_err(|id| Error::IdAlreadySet { id })
|
||||||
.compare_exchange(0, id.get(), Ordering::Relaxed, Ordering::Relaxed)
|
|
||||||
.map_err(|id| Error::IdAlreadySet { id })?;
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the current server ID, or an error if not yet set.
|
/// Returns the current server ID, or an error if not yet set.
|
||||||
pub fn require_id(&self) -> Result<NonZeroU32> {
|
pub fn require_id(&self) -> Result<NonZeroU32> {
|
||||||
NonZeroU32::new(self.id.load(Ordering::Relaxed)).context(IdNotSet)
|
self.id.get().context(IdNotSet)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tells the server the set of rules for a database.
|
/// Tells the server the set of rules for a database.
|
||||||
|
|
Loading…
Reference in New Issue