Merge pull request #576 from influxdata/dom/atomic-server-id

perf: avoid locking to load server ID
pull/24376/head
Dom 2020-12-18 14:08:47 +00:00 committed by GitHub
commit feb3064f92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 16 additions and 9 deletions

View File

@ -3,7 +3,7 @@
use std::{
collections::BTreeMap,
sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicU32, AtomicU64, Ordering},
Arc,
},
};
@ -29,6 +29,9 @@ use tokio::sync::RwLock;
type DatabaseError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// A server ID of 0 is reserved and indicates no ID has been configured.
const SERVER_ID_NOT_SET: u32 = 0;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Server error: {}", source))]
@ -69,6 +72,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// of these structs, which keeps track of all replication and query rules.
#[derive(Debug)]
pub struct Server<M: ConnectionManager> {
id: AtomicU32,
config: RwLock<Config>,
connection_manager: M,
pub store: Arc<ObjectStore>,
@ -76,8 +80,6 @@ pub struct Server<M: ConnectionManager> {
#[derive(Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
struct Config {
// id is optional because this may not be set on startup. It might be set via an API call
id: Option<u32>,
databases: BTreeMap<DatabaseName<'static>, Db>,
host_groups: BTreeMap<HostGroupId, HostGroup>,
}
@ -85,6 +87,7 @@ struct Config {
impl<M: ConnectionManager> Server<M> {
pub fn new(connection_manager: M, store: Arc<ObjectStore>) -> Self {
Self {
id: AtomicU32::new(SERVER_ID_NOT_SET),
config: RwLock::new(Config::default()),
store,
connection_manager,
@ -92,15 +95,19 @@ impl<M: ConnectionManager> Server<M> {
}
/// sets the id of the server, which is used for replication and the base
/// path in object storage
/// path in object storage.
///
/// A valid server ID Must be non-zero.
pub async fn set_id(&self, id: u32) {
let mut config = self.config.write().await;
config.id = Some(id);
self.id.store(id, Ordering::Release)
}
/// Returns the current server ID, or an error if not yet set.
async fn require_id(&self) -> Result<u32> {
let config = self.config.read().await;
config.id.context(IdNotSet)
match self.id.load(Ordering::Acquire) {
SERVER_ID_NOT_SET => Err(Error::IdNotSet),
v => Ok(v),
}
}
/// Tells the server the set of rules for a database. Currently, this is not
@ -706,7 +713,7 @@ partition_key:
.await
.unwrap();
let config = r#"{"id":1,"databases":{"foo":{"partition_template":{"parts":[]},"store_locally":false,"replication":["az1"],"replication_count":1,"replication_queue_max_size":0,"subscriptions":[],"query_local":false,"primary_query_group":null,"secondary_query_groups":[],"read_only_partitions":[],"wal_buffer_config":null}},"host_groups":{"az1":{"id":"az1","hosts":["serverA"]}}}"#;
let config = r#"{"databases":{"foo":{"partition_template":{"parts":[]},"store_locally":false,"replication":["az1"],"replication_count":1,"replication_queue_max_size":0,"subscriptions":[],"query_local":false,"primary_query_group":null,"secondary_query_groups":[],"read_only_partitions":[],"wal_buffer_config":null}},"host_groups":{"az1":{"id":"az1","hosts":["serverA"]}}}"#;
let read_data = std::str::from_utf8(&*read_data).unwrap();
println!("\n\n{}\n", read_data);
assert_eq!(read_data, config);