feat: share job registry with Db struct (#1038)

pull/24376/head
Raphael Taylor-Davies 2021-03-23 12:34:55 +00:00 committed by GitHub
parent 1cbfea7096
commit d4eef65f2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 93 additions and 37 deletions

View File

@ -1,5 +1,8 @@
/// This module contains code for managing the configuration of the server.
use crate::{db::Db, Error, Result};
use std::{
collections::{BTreeMap, BTreeSet},
sync::{Arc, RwLock},
};
use data_types::{
database_rules::{DatabaseRules, WriterId},
DatabaseName,
@ -8,22 +11,28 @@ use mutable_buffer::MutableBufferDb;
use object_store::path::ObjectStorePath;
use read_buffer::Database as ReadBufferDb;
use std::{
collections::{BTreeMap, BTreeSet},
sync::{Arc, RwLock},
};
/// This module contains code for managing the configuration of the server.
use crate::{db::Db, Error, JobRegistry, Result};
pub(crate) const DB_RULES_FILE_NAME: &str = "rules.json";
/// The Config tracks the configuration od databases and their rules along
/// with host groups for replication. It is used as an in-memory structure
/// that can be loaded incrementally from objet storage.
#[derive(Default, Debug)]
#[derive(Debug)]
pub(crate) struct Config {
jobs: Arc<JobRegistry>,
state: RwLock<ConfigState>,
}
impl Config {
pub(crate) fn new(jobs: Arc<JobRegistry>) -> Self {
Self {
state: Default::default(),
jobs,
}
}
pub(crate) fn create_db(
&self,
name: DatabaseName<'static>,
@ -45,7 +54,13 @@ impl Config {
let read_buffer = ReadBufferDb::new();
let wal_buffer = rules.wal_buffer_config.as_ref().map(Into::into);
let db = Arc::new(Db::new(rules, mutable_buffer, read_buffer, wal_buffer));
let db = Arc::new(Db::new(
rules,
mutable_buffer,
read_buffer,
wal_buffer,
Arc::clone(&self.jobs),
));
state.reservations.insert(name.clone());
Ok(CreateDatabaseHandle {
@ -147,13 +162,14 @@ impl<'a> Drop for CreateDatabaseHandle<'a> {
#[cfg(test)]
mod test {
use super::*;
use object_store::{memory::InMemory, ObjectStore, ObjectStoreApi};
use super::*;
#[test]
fn create_db() {
let name = DatabaseName::new("foo").unwrap();
let config = Config::default();
let config = Config::new(Arc::new(JobRegistry::new()));
let rules = DatabaseRules::new();
{

View File

@ -10,20 +10,20 @@ use std::{
};
use async_trait::async_trait;
use parking_lot::Mutex;
use snafu::{OptionExt, ResultExt, Snafu};
use tracing::info;
pub(crate) use chunk::DBChunk;
use data_types::{chunk::ChunkSummary, database_rules::DatabaseRules};
use internal_types::{data::ReplicatedWrite, selection::Selection};
use mutable_buffer::MutableBufferDb;
use parking_lot::Mutex;
use query::{Database, PartitionChunk};
use read_buffer::Database as ReadBufferDb;
use snafu::{OptionExt, ResultExt, Snafu};
use crate::buffer::Buffer;
use tracing::info;
use crate::{buffer::Buffer, JobRegistry};
mod chunk;
pub(crate) use chunk::DBChunk;
pub mod pred;
mod streams;
@ -100,6 +100,8 @@ pub struct Db {
/// and to persist segments in object storage for recovery.
pub wal_buffer: Option<Mutex<Buffer>>,
jobs: Arc<JobRegistry>,
sequence: AtomicU64,
}
impl Db {
@ -108,6 +110,7 @@ impl Db {
mutable_buffer: Option<MutableBufferDb>,
read_buffer: ReadBufferDb,
wal_buffer: Option<Buffer>,
jobs: Arc<JobRegistry>,
) -> Self {
let wal_buffer = wal_buffer.map(Mutex::new);
let read_buffer = Arc::new(read_buffer);
@ -116,6 +119,7 @@ impl Db {
mutable_buffer,
read_buffer,
wal_buffer,
jobs,
sequence: AtomicU64::new(STARTING_SEQUENCE),
}
}
@ -376,10 +380,6 @@ impl Database for Db {
#[cfg(test)]
mod tests {
use crate::query_tests::utils::make_db;
use super::*;
use arrow_deps::{
arrow::record_batch::RecordBatch, assert_table_eq, datafusion::physical_plan::collect,
};
@ -392,6 +392,10 @@ mod tests {
};
use test_helpers::assert_contains;
use crate::query_tests::utils::make_db;
use super::*;
#[tokio::test]
async fn write_no_mutable_buffer() {
// Validate that writes are rejected if there is no mutable buffer
@ -592,6 +596,7 @@ mod tests {
buffer_size: 300,
..Default::default()
};
let rules = DatabaseRules {
mutable_buffer_config: Some(mbconf.clone()),
..Default::default()
@ -602,6 +607,7 @@ mod tests {
Some(MutableBufferDb::new("foo")),
read_buffer::Database::new(),
None, // wal buffer
Arc::new(JobRegistry::new()),
);
let mut writer = TestLPWriter::default();

View File

@ -84,9 +84,8 @@ use data_types::{
job::Job,
{DatabaseName, DatabaseNameError},
};
use internal_types::data::{lines_to_replicated_write, ReplicatedWrite};
use influxdb_line_protocol::ParsedLine;
use internal_types::data::{lines_to_replicated_write, ReplicatedWrite};
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
use query::{exec::Executor, DatabaseStore};
@ -95,7 +94,9 @@ use crate::{
object_store_path_for_database_config, Config, GRPCConnectionString, DB_RULES_FILE_NAME,
},
db::Db,
tracker::{TrackedFutureExt, Tracker, TrackerId, TrackerRegistryWithHistory},
tracker::{
TrackedFutureExt, Tracker, TrackerId, TrackerRegistration, TrackerRegistryWithHistory,
},
};
pub mod buffer;
@ -149,9 +150,34 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
const STORE_ERROR_PAUSE_SECONDS: u64 = 100;
const JOB_HISTORY_SIZE: usize = 1000;
/// The global job registry
#[derive(Debug)]
pub struct JobRegistry {
inner: Mutex<TrackerRegistryWithHistory<Job>>,
}
impl Default for JobRegistry {
fn default() -> Self {
Self {
inner: Mutex::new(TrackerRegistryWithHistory::new(JOB_HISTORY_SIZE)),
}
}
}
impl JobRegistry {
fn new() -> Self {
Default::default()
}
pub fn register(&self, job: Job) -> (Tracker<Job>, TrackerRegistration) {
self.inner.lock().register(job)
}
}
const STORE_ERROR_PAUSE_SECONDS: u64 = 100;
/// `Server` is the container struct for how servers store data internally, as
/// well as how they communicate with other servers. Each server will have one
/// of these structs, which keeps track of all replication and query rules.
@ -162,18 +188,20 @@ pub struct Server<M: ConnectionManager> {
connection_manager: Arc<M>,
pub store: Arc<ObjectStore>,
executor: Arc<Executor>,
jobs: Mutex<TrackerRegistryWithHistory<Job>>,
jobs: Arc<JobRegistry>,
}
impl<M: ConnectionManager> Server<M> {
pub fn new(connection_manager: M, store: Arc<ObjectStore>) -> Self {
let jobs = Arc::new(JobRegistry::new());
Self {
id: AtomicU32::new(SERVER_ID_NOT_SET),
config: Arc::new(Config::default()),
config: Arc::new(Config::new(Arc::clone(&jobs))),
store,
connection_manager: Arc::new(connection_manager),
executor: Arc::new(Executor::new()),
jobs: Mutex::new(TrackerRegistryWithHistory::new(JOB_HISTORY_SIZE)),
jobs,
}
}
@ -354,7 +382,7 @@ impl<M: ConnectionManager> Server<M> {
let writer_id = self.require_id()?;
let store = Arc::clone(&self.store);
let (_, tracker) = self.jobs.lock().register(Job::PersistSegment {
let (_, tracker) = self.jobs.register(Job::PersistSegment {
writer_id,
segment_id: segment.id,
});
@ -390,7 +418,7 @@ impl<M: ConnectionManager> Server<M> {
}
pub fn spawn_dummy_job(&self, nanos: Vec<u64>) -> Tracker<Job> {
let (tracker, registration) = self.jobs.lock().register(Job::Dummy {
let (tracker, registration) = self.jobs.register(Job::Dummy {
nanos: nanos.clone(),
});
@ -422,7 +450,7 @@ impl<M: ConnectionManager> Server<M> {
.db(&name)
.context(DatabaseNotFound { db_name: &db_name })?;
let (tracker, registration) = self.jobs.lock().register(Job::CloseChunk {
let (tracker, registration) = self.jobs.register(Job::CloseChunk {
db_name: db_name.clone(),
partition_key: partition_key.clone(),
chunk_id,
@ -467,12 +495,12 @@ impl<M: ConnectionManager> Server<M> {
/// Returns a list of all jobs tracked by this server
pub fn tracked_jobs(&self) -> Vec<Tracker<Job>> {
self.jobs.lock().tracked()
self.jobs.inner.lock().tracked()
}
/// Returns a specific job tracked by this server
pub fn get_job(&self, id: TrackerId) -> Option<Tracker<Job>> {
self.jobs.lock().get(id)
self.jobs.inner.lock().get(id)
}
/// Background worker function
@ -480,7 +508,7 @@ impl<M: ConnectionManager> Server<M> {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
while !shutdown.is_cancelled() {
self.jobs.lock().reclaim();
self.jobs.inner.lock().reclaim();
tokio::select! {
_ = interval.tick() => {},
@ -620,6 +648,8 @@ mod tests {
use futures::TryStreamExt;
use parking_lot::Mutex;
use snafu::Snafu;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use arrow_deps::{assert_table_eq, datafusion::physical_plan::collect};
use data_types::database_rules::{
@ -632,8 +662,6 @@ mod tests {
use crate::buffer::Segment;
use super::*;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T = (), E = TestError> = std::result::Result<T, E>;

View File

@ -1,7 +1,8 @@
use data_types::database_rules::DatabaseRules;
use mutable_buffer::MutableBufferDb;
use crate::db::Db;
use crate::{db::Db, JobRegistry};
use std::sync::Arc;
/// Used for testing: create a Database with a local store
pub fn make_db() -> Db {
@ -11,5 +12,6 @@ pub fn make_db() -> Db {
Some(MutableBufferDb::new(name)),
read_buffer::Database::new(),
None, // wal buffer
Arc::new(JobRegistry::new()),
)
}

View File

@ -361,7 +361,10 @@ impl TryClone for MemWriter {
#[cfg(test)]
mod tests {
use crate::db::{DBChunk, Db};
use crate::{
db::{DBChunk, Db},
JobRegistry,
};
use read_buffer::Database as ReadBufferDb;
use super::*;
@ -480,6 +483,7 @@ mem,host=A,region=west used=45 1
Some(MutableBufferDb::new(name)),
ReadBufferDb::new(),
None, // wal buffer
Arc::new(JobRegistry::new()),
)
}
}