Merge pull request #389 from influxdata/pd-database_rules_persistence

feat: implement API for storing the server configuration
pull/24376/head
Paul Dix 2020-10-26 15:47:48 -04:00 committed by GitHub
commit 1fe8f517cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 194 additions and 30 deletions

4
Cargo.lock generated
View File

@ -782,12 +782,16 @@ name = "delorean_cluster"
version = "0.1.0"
dependencies = [
"async-trait",
"bytes",
"delorean_arrow",
"delorean_data_types",
"delorean_generated_types",
"delorean_line_parser",
"delorean_object_store",
"delorean_storage",
"delorean_write_buffer",
"futures",
"serde",
"serde_json",
"snafu",
"tokio",

View File

@ -8,6 +8,7 @@ edition = "2018"
[dependencies]
snafu = "0.6"
serde = "1.0"
serde_json = "1.0"
async-trait = "0.1"
delorean_data_types = { path = "../delorean_data_types" }
@ -15,5 +16,8 @@ delorean_generated_types = { path = "../delorean_generated_types" }
delorean_line_parser = { path = "../delorean_line_parser" }
delorean_storage = { path = "../delorean_storage" }
delorean_write_buffer = { path = "../delorean_write_buffer" }
delorean_object_store = { path = "../delorean_object_store" }
tokio = { version = "0.2", features = ["full"] }
delorean_arrow = { path = "../delorean_arrow" }
futures = "0.3.7"
bytes = "0.5"

View File

@ -65,22 +65,27 @@
use std::{
collections::BTreeMap,
sync::{atomic::AtomicU64, Arc},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use delorean_arrow::arrow::record_batch::RecordBatch;
use delorean_data_types::{
data::{lines_to_replicated_write, ReplicatedWrite},
database_rules::{DatabaseRules, HostGroup, HostGroupId},
database_rules::{DatabaseRules, HostGroup, HostGroupId, MatchTables},
};
use delorean_line_parser::ParsedLine;
use delorean_object_store::ObjectStore;
use delorean_storage::Database;
use delorean_write_buffer::Db as WriteBufferDb;
use async_trait::async_trait;
use delorean_arrow::arrow::record_batch::RecordBatch;
use delorean_data_types::database_rules::MatchTables;
use bytes::Bytes;
use futures::stream::TryStreamExt;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::sync::atomic::Ordering;
type DatabaseError = Box<dyn std::error::Error + Send + Sync + 'static>;
@ -107,6 +112,14 @@ pub enum Error {
ErrorReplicating { source: DatabaseError },
#[snafu(display("unable to use server until id is set"))]
IdNotSet,
#[snafu(display("error serializing configuration {}", source))]
ErrorSerializing { source: serde_json::Error },
#[snafu(display("error deserializing configuration {}", source))]
ErrorDeserializing { source: serde_json::Error },
#[snafu(display("store error: {}", source))]
StoreError {
source: delorean_object_store::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -116,30 +129,35 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// of all replication and query rules.
#[derive(Debug)]
pub struct Server<M: ConnectionManager> {
config: Config,
connection_manager: M,
store: ObjectStore,
}
#[derive(Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
struct Config {
// id is optional because this may not be set on startup. It might be set via an API call
id: Option<u32>,
databases: BTreeMap<String, Db>,
host_groups: BTreeMap<HostGroupId, HostGroup>,
connection_manager: M,
}
impl<M: ConnectionManager> Server<M> {
pub fn new(connection_manager: M) -> Self {
pub fn new(connection_manager: M, store: ObjectStore) -> Self {
Self {
id: None,
databases: BTreeMap::new(),
host_groups: BTreeMap::new(),
config: Config::default(),
store,
connection_manager,
}
}
/// sets the id of the server, which is used for replication and the base path in object storage
pub fn set_id(&mut self, id: u32) {
self.id = Some(id);
self.config.id = Some(id);
}
fn require_id(&self) -> Result<u32> {
Ok(self.id.context(IdNotSet)?)
Ok(self.config.id.context(IdNotSet)?)
}
/// Tells the server the set of rules for a database. Currently, this is not persisted and
@ -166,7 +184,7 @@ impl<M: ConnectionManager> Server<M> {
sequence,
};
self.databases.insert(db_name, db);
self.config.databases.insert(db_name, db);
Ok(())
}
@ -177,7 +195,52 @@ impl<M: ConnectionManager> Server<M> {
pub async fn create_host_group(&mut self, id: HostGroupId, hosts: Vec<String>) -> Result<()> {
self.require_id()?;
self.host_groups.insert(id.clone(), HostGroup { id, hosts });
self.config
.host_groups
.insert(id.clone(), HostGroup { id, hosts });
Ok(())
}
/// Saves the configuration of database rules and host groups to a single JSON file in
/// the configured store under a directory /<writer ID/config.json
pub async fn store_configuration(&self) -> Result<()> {
let id = self.require_id()?;
let data = Bytes::from(serde_json::to_vec(&self.config).context(ErrorSerializing)?);
let len = data.len();
let location = config_location(id);
let stream_data = std::io::Result::Ok(data);
self.store
.put(
&location,
futures::stream::once(async move { stream_data }),
len,
)
.await
.context(StoreError)?;
Ok(())
}
/// Loads the configuration for this server from the configured store. This replaces
/// any in-memory configuration that might already be set.
pub async fn load_configuration(&mut self, id: u32) -> Result<()> {
let location = config_location(id);
let read_data = self
.store
.get(&location)
.await
.context(StoreError)?
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.context(StoreError)?;
let config: Config = serde_json::from_slice(&read_data).context(ErrorDeserializing)?;
self.config = config;
Ok(())
}
@ -189,6 +252,7 @@ impl<M: ConnectionManager> Server<M> {
let id = self.require_id()?;
let db = self
.config
.databases
.get(db_name)
.context(DatabaseNotFound { db: db_name })?;
@ -204,6 +268,7 @@ impl<M: ConnectionManager> Server<M> {
/// Executes a query against the local write buffer database, if one exists.
pub async fn query_local(&self, db_name: &str, query: &str) -> Result<Vec<RecordBatch>> {
let db = self
.config
.databases
.get(db_name)
.context(DatabaseNotFound { db: db_name })?;
@ -254,6 +319,7 @@ impl<M: ConnectionManager> Server<M> {
write: &ReplicatedWrite,
) -> Result<()> {
let group = self
.config
.host_groups
.get(host_group_id)
.context(HostGroupNotFound { id: host_group_id })?;
@ -305,13 +371,23 @@ pub trait RemoteServer {
) -> Result<(), Self::Error>;
}
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct Db {
#[serde(flatten)]
pub rules: DatabaseRules,
#[serde(skip)]
pub buffer: Option<WriteBufferDb>,
#[serde(skip)]
sequence: AtomicU64,
}
impl PartialEq for Db {
fn eq(&self, other: &Self) -> bool {
self.rules == other.rules
}
}
impl Eq for Db {}
const STARTING_SEQUENCE: u64 = 1;
impl Db {
@ -320,6 +396,11 @@ impl Db {
}
}
// location in the store for the configuration file
fn config_location(id: u32) -> String {
format!("{}/config.json", id)
}
#[cfg(test)]
mod tests {
use super::*;
@ -327,6 +408,8 @@ mod tests {
use delorean_arrow::arrow::{csv, util::string_writer::StringWriter};
use delorean_data_types::database_rules::{MatchTables, Matcher, Subscription};
use delorean_line_parser::parse_lines;
use delorean_object_store::{InMemory, ObjectStoreIntegration};
use futures::TryStreamExt;
use snafu::Snafu;
use std::sync::Mutex;
@ -336,7 +419,8 @@ mod tests {
#[tokio::test]
async fn server_api_calls_return_error_with_no_id_set() -> Result {
let manager = TestConnectionManager::new();
let mut server = Server::new(manager);
let store = ObjectStore::new_in_memory(InMemory::new());
let mut server = Server::new(manager, store);
let rules = DatabaseRules::default();
let resp = server.create_database("foo", rules).await.unwrap_err();
@ -358,7 +442,8 @@ mod tests {
#[tokio::test]
async fn writes_local() -> Result {
let manager = TestConnectionManager::new();
let mut server = Server::new(manager);
let store = ObjectStore::new_in_memory(InMemory::new());
let mut server = Server::new(manager, store);
server.set_id(1);
let rules = DatabaseRules {
store_locally: true,
@ -396,7 +481,9 @@ mod tests {
.remotes
.insert(remote_id.to_string(), remote.clone());
let mut server = Server::new(manager);
let store = ObjectStore::new_in_memory(InMemory::new());
let mut server = Server::new(manager, store);
server.set_id(1);
let host_group_id = "az1".to_string();
let rules = DatabaseRules {
@ -453,7 +540,9 @@ partition_key:
.remotes
.insert(remote_id.to_string(), remote.clone());
let mut server = Server::new(manager);
let store = ObjectStore::new_in_memory(InMemory::new());
let mut server = Server::new(manager, store);
server.set_id(1);
let host_group_id = "az1".to_string();
let rules = DatabaseRules {
@ -507,6 +596,59 @@ partition_key:
Ok(())
}
#[tokio::test]
async fn store_and_load_configuration() -> Result {
let manager = TestConnectionManager::new();
let store = ObjectStore::new_in_memory(InMemory::new());
let mut server = Server::new(manager, store);
server.set_id(1);
let host_group_id = "az1".to_string();
let remote_id = "serverA";
let rules = DatabaseRules {
replication: vec![host_group_id.clone()],
replication_count: 1,
..Default::default()
};
server
.create_host_group(host_group_id.clone(), vec![remote_id.to_string()])
.await
.unwrap();
let db_name = "foo";
server.create_database(db_name, rules).await.unwrap();
server.store_configuration().await.unwrap();
let location = "1/config.json";
let read_data = server
.store
.get(location)
.await
.unwrap()
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.unwrap();
let config = r#"{"id":1,"databases":{"foo":{"partition_template":{"parts":[]},"store_locally":false,"replication":["az1"],"replication_count":1,"replication_queue_max_size":0,"subscriptions":[],"query_local":false,"primary_query_group":null,"secondary_query_groups":[],"read_only_partitions":[]}},"host_groups":{"az1":{"id":"az1","hosts":["serverA"]}}}"#;
let read_data = std::str::from_utf8(&*read_data).unwrap();
assert_eq!(read_data, config);
let manager = TestConnectionManager::new();
let store = match server.store.0 {
ObjectStoreIntegration::InMemory(in_mem) => in_mem.clone().await,
_ => panic!("wrong type"),
};
let store = ObjectStore::new_in_memory(store);
let mut recovered_server = Server::new(manager, store);
assert_ne!(server.config, recovered_server.config);
recovered_server.load_configuration(1).await.unwrap();
assert_eq!(server.config, recovered_server.config);
Ok(())
}
#[derive(Snafu, Debug, Clone)]
enum TestClusterError {
#[snafu(display("Test delorean_cluster error: {}", message))]

View File

@ -17,7 +17,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// DatabaseRules contains the rules for replicating data, sending data to subscribers, and
/// querying data for a single database.
#[derive(Debug, Serialize, Deserialize, Default)]
#[derive(Debug, Serialize, Deserialize, Default, Eq, PartialEq)]
pub struct DatabaseRules {
/// Template that generates a partition key for each row inserted into the db
pub partition_template: PartitionTemplate,
@ -87,7 +87,7 @@ impl DatabaseRules {
///
/// The key is constructed in order of the template parts; thus ordering changes what partition
/// key is generated.
#[derive(Debug, Serialize, Deserialize, Default)]
#[derive(Debug, Serialize, Deserialize, Default, Eq, PartialEq)]
pub struct PartitionTemplate {
parts: Vec<TemplatePart>,
}
@ -123,7 +123,7 @@ impl PartitionTemplate {
}
/// `TemplatePart` specifies what part of a row should be used to compute this part of a partition key.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum TemplatePart {
Table,
Column(String),
@ -133,7 +133,7 @@ pub enum TemplatePart {
}
/// `RegexCapture` is for pulling parts of a string column into the partition key.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct RegexCapture {
column: String,
regex: String,
@ -141,7 +141,7 @@ pub struct RegexCapture {
/// `StrftimeColumn` can be used to create a time based partition key off some column other than
/// the builtin `time` column.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct StrftimeColumn {
column: String,
format: String,
@ -162,7 +162,7 @@ pub type WriterId = String;
///
/// For pull based subscriptions, the requester will send a matcher, which the receiver
/// will execute against its in-memory WAL.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct Subscription {
pub name: String,
pub host_group_id: HostGroupId,
@ -171,7 +171,7 @@ pub struct Subscription {
/// `Matcher` specifies the rule against the table name and/or a predicate
/// against the row to determine if it matches the write rule.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct Matcher {
#[serde(flatten)]
pub tables: MatchTables,
@ -182,7 +182,7 @@ pub struct Matcher {
/// `MatchTables` looks at the table name of a row to determine if it should
/// match the rule.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub enum MatchTables {
#[serde(rename = "*")]
@ -193,7 +193,7 @@ pub enum MatchTables {
pub type HostGroupId = String;
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct HostGroup {
pub id: HostGroupId,
/// `hosts` is a vector of connection strings for remote hosts.

View File

@ -26,7 +26,7 @@ use tokio_util::codec::{BytesCodec, FramedRead};
/// Universal interface to multiple object store services.
#[derive(Debug)]
pub struct ObjectStore(ObjectStoreIntegration);
pub struct ObjectStore(pub ObjectStoreIntegration);
impl ObjectStore {
/// Configure a connection to Amazon S3.
@ -108,10 +108,14 @@ impl ObjectStore {
/// All supported object storage integrations
#[derive(Debug)]
enum ObjectStoreIntegration {
pub enum ObjectStoreIntegration {
/// GCP storage
GoogleCloudStorage(GoogleCloudStorage),
/// Amazon storage
AmazonS3(AmazonS3),
/// In memory storage for testing
InMemory(InMemory),
/// Local file system storage
File(File),
}
@ -370,6 +374,16 @@ impl InMemory {
Self::default()
}
/// Creates a clone of the store
pub async fn clone(&self) -> Self {
let storage = self.storage.read().await;
let storage = storage.clone();
Self {
storage: RwLock::new(storage),
}
}
/// Save the provided bytes to the specified location.
async fn put<S>(&self, location: &str, bytes: S, length: usize) -> InternalResult<()>
where