refactor: use DatabaseName in DatabaseRules (#1127)

pull/24376/head
Raphael Taylor-Davies 2021-04-06 14:26:30 +01:00 committed by GitHub
parent 7cc9f06e74
commit 5cd1d6691d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 134 additions and 169 deletions

View File

@ -37,11 +37,10 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// DatabaseRules contains the rules for replicating data, sending data to
/// subscribers, and querying data for a single database.
#[derive(Debug, Default, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct DatabaseRules {
/// The unencoded name of the database. This gets put in by the create
/// database call, so an empty default is fine.
pub name: String, // TODO: Use DatabaseName here
/// The name of the database
pub name: DatabaseName<'static>,
/// Template that generates a partition key for each row inserted into the
/// db
@ -73,8 +72,14 @@ impl DatabaseRules {
self.partition_template.partition_key(line, default_time)
}
pub fn new() -> Self {
Self::default()
pub fn new(name: DatabaseName<'static>) -> Self {
Self {
name,
partition_template: Default::default(),
wal_buffer_config: None,
lifecycle_rules: Default::default(),
shard_config: None,
}
}
pub fn db_name(&self) -> &str {
@ -112,7 +117,7 @@ impl Partitioner for DatabaseRules {
impl From<DatabaseRules> for management::DatabaseRules {
fn from(rules: DatabaseRules) -> Self {
Self {
name: rules.name,
name: rules.name.into(),
partition_template: Some(rules.partition_template.into()),
wal_buffer_config: rules.wal_buffer_config.map(Into::into),
lifecycle_rules: Some(rules.lifecycle_rules.into()),
@ -125,7 +130,7 @@ impl TryFrom<management::DatabaseRules> for DatabaseRules {
type Error = FieldViolation;
fn try_from(proto: management::DatabaseRules) -> Result<Self, Self::Error> {
DatabaseName::new(&proto.name).field("name")?;
let name = DatabaseName::new(proto.name.clone()).field("name")?;
let wal_buffer_config = proto.wal_buffer_config.optional("wal_buffer_config")?;
@ -145,7 +150,7 @@ impl TryFrom<management::DatabaseRules> for DatabaseRules {
.unwrap_or_default();
Ok(Self {
name: proto.name,
name,
partition_template,
wal_buffer_config,
lifecycle_rules,
@ -589,12 +594,8 @@ pub struct PartitionTemplate {
pub parts: Vec<TemplatePart>,
}
impl PartitionTemplate {
pub fn partition_key(
&self,
line: &ParsedLine<'_>,
default_time: &DateTime<Utc>,
) -> Result<String> {
impl Partitioner for PartitionTemplate {
fn partition_key(&self, line: &ParsedLine<'_>, default_time: &DateTime<Utc>) -> Result<String> {
let parts: Vec<_> = self
.parts
.iter()
@ -1136,7 +1137,7 @@ mod tests {
let rules: DatabaseRules = protobuf.clone().try_into().unwrap();
let back: management::DatabaseRules = rules.clone().into();
assert_eq!(rules.name, protobuf.name);
assert_eq!(rules.name.as_str(), protobuf.name.as_str());
assert_eq!(protobuf.name, back.name);
assert_eq!(rules.partition_template.parts.len(), 0);

View File

@ -1,6 +1,6 @@
use criterion::measurement::WallTime;
use criterion::{criterion_group, criterion_main, Bencher, BenchmarkId, Criterion, Throughput};
use data_types::database_rules::{DatabaseRules, PartitionTemplate, TemplatePart};
use data_types::database_rules::{PartitionTemplate, TemplatePart};
use generated_types::wal as wb;
use influxdb_line_protocol::{parse_lines, ParsedLine};
use internal_types::data::{lines_to_replicated_write as lines_to_rw, ReplicatedWrite};
@ -92,12 +92,12 @@ fn bytes_into_struct(c: &mut Criterion) {
fn run_group(
group_name: &str,
c: &mut Criterion,
bench: impl Fn(&[ParsedLine], &DatabaseRules, &Config, &mut Bencher<WallTime>),
bench: impl Fn(&[ParsedLine], &PartitionTemplate, &Config, &mut Bencher<WallTime>),
) {
let mut group = c.benchmark_group(group_name);
group.sample_size(50);
group.measurement_time(Duration::from_secs(10));
let rules = rules_with_time_partition();
let template = partition_time();
for partition_count in [1, 100].iter() {
let config = Config {
@ -113,7 +113,7 @@ fn run_group(
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
group.bench_with_input(id, &config, |b, config| {
bench(&lines, &rules, &config, b);
bench(&lines, &template, &config, b);
});
}
@ -131,7 +131,7 @@ fn run_group(
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
group.bench_with_input(id, &config, |b, config| {
bench(&lines, &rules, &config, b);
bench(&lines, &template, &config, b);
});
}
@ -149,7 +149,7 @@ fn run_group(
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
group.bench_with_input(id, &config, |b, config| {
bench(&lines, &rules, &config, b);
bench(&lines, &template, &config, b);
});
}
@ -375,14 +375,9 @@ fn create_lp(config: &Config) -> String {
s
}
fn rules_with_time_partition() -> DatabaseRules {
let partition_template = PartitionTemplate {
fn partition_time() -> PartitionTemplate {
PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d %H:%M:%S".to_string())],
};
DatabaseRules {
partition_template,
..Default::default()
}
}

View File

@ -18,7 +18,7 @@ use crate::{
Database, DatabaseStore, PartitionChunk, Predicate,
};
use data_types::database_rules::{DatabaseRules, PartitionTemplate, TemplatePart};
use data_types::database_rules::{PartitionTemplate, TemplatePart};
use influxdb_line_protocol::{parse_lines, ParsedLine};
use internal_types::{
data::{lines_to_replicated_write, ReplicatedWrite},
@ -530,12 +530,12 @@ impl TestLPWriter {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%dT%H".to_string())],
};
let rules = DatabaseRules {
partition_template,
..Default::default()
};
let write = lines_to_replicated_write(self.writer_id, self.sequence_number, &lines, &rules);
let write = lines_to_replicated_write(
self.writer_id,
self.sequence_number,
&lines,
&partition_template,
);
self.sequence_number += 1;
database
.store_replicated_write(&write)

View File

@ -581,7 +581,7 @@ fn database_object_store_path(
#[cfg(test)]
mod tests {
use super::*;
use data_types::database_rules::DatabaseRules;
use data_types::database_rules::PartitionTemplate;
use influxdb_line_protocol::parse_lines;
use internal_types::data::lines_to_replicated_write;
use object_store::memory::InMemory;
@ -959,12 +959,12 @@ mod tests {
lp: &str,
) -> Arc<ReplicatedWrite> {
let lines: Vec<_> = parse_lines(lp).map(|l| l.unwrap()).collect();
let rules = DatabaseRules::new();
let partitioner = PartitionTemplate::default();
Arc::new(lines_to_replicated_write(
writer_id,
sequence_number,
&lines,
&rules,
&partitioner,
))
}
}

View File

@ -42,33 +42,18 @@ impl Config {
}
}
pub(crate) fn create_db(
&self,
name: DatabaseName<'static>,
rules: DatabaseRules,
) -> Result<CreateDatabaseHandle<'_>> {
pub(crate) fn create_db(&self, rules: DatabaseRules) -> Result<CreateDatabaseHandle<'_>> {
let mut state = self.state.write().expect("mutex poisoned");
if state.reservations.contains(&name) || state.databases.contains_key(&name) {
if state.reservations.contains(&rules.name) || state.databases.contains_key(&rules.name) {
return Err(Error::DatabaseAlreadyExists {
db_name: name.to_string(),
db_name: rules.name.to_string(),
});
}
let read_buffer = ReadBufferDb::new();
let wal_buffer = rules.wal_buffer_config.as_ref().map(Into::into);
let db = Arc::new(Db::new(
rules,
read_buffer,
wal_buffer,
Arc::clone(&self.jobs),
));
state.reservations.insert(name.clone());
state.reservations.insert(rules.name.clone());
Ok(CreateDatabaseHandle {
db,
rules: Some(rules),
config: &self,
name,
})
}
@ -97,11 +82,11 @@ impl Config {
state.remotes.remove(&id)
}
fn commit(&self, name: &DatabaseName<'static>, db: Arc<Db>) {
fn commit(&self, rules: DatabaseRules) {
let mut state = self.state.write().expect("mutex poisoned");
let name = state
.reservations
.take(name)
.take(&rules.name)
.expect("reservation doesn't exist");
if self.shutdown.is_cancelled() {
@ -109,6 +94,15 @@ impl Config {
return;
}
let read_buffer = ReadBufferDb::new();
let wal_buffer = rules.wal_buffer_config.as_ref().map(Into::into);
let db = Arc::new(Db::new(
rules,
read_buffer,
wal_buffer,
Arc::clone(&self.jobs),
));
let shutdown = self.shutdown.child_token();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
@ -220,20 +214,27 @@ impl Drop for DatabaseState {
/// persisted.
#[derive(Debug)]
pub(crate) struct CreateDatabaseHandle<'a> {
pub db: Arc<Db>,
pub name: DatabaseName<'static>,
/// Partial moves aren't supported on structures that implement Drop
/// so use Option to allow taking DatabaseRules out in `commit`
rules: Option<DatabaseRules>,
config: &'a Config,
}
impl<'a> CreateDatabaseHandle<'a> {
pub(crate) fn commit(self) {
self.config.commit(&self.name, Arc::clone(&self.db))
pub(crate) fn commit(mut self) {
self.config.commit(self.rules.take().unwrap())
}
pub(crate) fn rules(&self) -> &DatabaseRules {
self.rules.as_ref().unwrap()
}
}
impl<'a> Drop for CreateDatabaseHandle<'a> {
fn drop(&mut self) {
self.config.rollback(&self.name);
if let Some(rules) = self.rules.take() {
self.config.rollback(&rules.name)
}
}
}
@ -247,15 +248,15 @@ mod test {
async fn create_db() {
let name = DatabaseName::new("foo").unwrap();
let config = Config::new(Arc::new(JobRegistry::new()));
let rules = DatabaseRules::new();
let rules = DatabaseRules::new(name.clone());
{
let _db_reservation = config.create_db(name.clone(), rules.clone()).unwrap();
let err = config.create_db(name.clone(), rules.clone()).unwrap_err();
let _db_reservation = config.create_db(rules.clone()).unwrap();
let err = config.create_db(rules.clone()).unwrap_err();
assert!(matches!(err, Error::DatabaseAlreadyExists{ .. }));
}
let db_reservation = config.create_db(name.clone(), rules).unwrap();
let db_reservation = config.create_db(rules).unwrap();
db_reservation.commit();
assert!(config.db(&name).is_some());
assert_eq!(config.db_names_sorted(), vec![name.clone()]);
@ -277,9 +278,9 @@ mod test {
async fn test_db_drop() {
let name = DatabaseName::new("foo").unwrap();
let config = Config::new(Arc::new(JobRegistry::new()));
let rules = DatabaseRules::new();
let rules = DatabaseRules::new(name.clone());
let db_reservation = config.create_db(name.clone(), rules).unwrap();
let db_reservation = config.create_db(rules).unwrap();
db_reservation.commit();
let token = config

View File

@ -536,7 +536,7 @@ impl Db {
) -> Tracker<Job> {
let name = self.rules.read().name.clone();
let (tracker, registration) = self.jobs.register(Job::CloseChunk {
db_name: name.clone(),
db_name: name.to_string(),
partition_key: partition_key.clone(),
chunk_id,
});
@ -730,7 +730,7 @@ mod tests {
use chrono::Utc;
use data_types::{
chunk::ChunkStorage,
database_rules::{LifecycleRules, Order, Sort, SortOrder},
database_rules::{Order, Sort, SortOrder},
partition_metadata::{ColumnSummary, StatValues, Statistics, TableSummary},
};
use query::{
@ -747,15 +747,7 @@ mod tests {
async fn write_no_mutable_buffer() {
// Validate that writes are rejected if there is no mutable buffer
let db = make_db();
let rules = DatabaseRules {
lifecycle_rules: LifecycleRules {
immutable: true,
..Default::default()
},
..DatabaseRules::new()
};
let rules = RwLock::new(rules);
let db = Db { rules, ..db };
db.rules.write().lifecycle_rules.immutable = true;
let mut writer = TestLPWriter::default();
let res = writer.write_lp_string(&db, "cpu bar=1 10");

View File

@ -225,22 +225,12 @@ impl<M: ConnectionManager> Server<M> {
}
/// Tells the server the set of rules for a database.
pub async fn create_database(
&self,
db_name: impl Into<String>,
mut rules: DatabaseRules,
) -> Result<()> {
pub async fn create_database(&self, rules: DatabaseRules) -> Result<()> {
// Return an error if this server hasn't yet been setup with an id
self.require_id()?;
let db_reservation = self.config.create_db(rules)?;
let name = db_name.into();
let db_name = DatabaseName::new(name.clone()).context(InvalidDatabaseName)?;
rules.name = name;
let db_reservation = self.config.create_db(db_name, rules)?;
let rules = db_reservation.db.rules.read().clone();
self.persist_database_rules(&db_reservation.name, rules)
self.persist_database_rules(db_reservation.rules().clone())
.await?;
db_reservation.commit();
@ -248,16 +238,13 @@ impl<M: ConnectionManager> Server<M> {
Ok(())
}
pub async fn persist_database_rules<'a>(
&self,
db_name: &DatabaseName<'static>,
rules: DatabaseRules,
) -> Result<()> {
pub async fn persist_database_rules<'a>(&self, rules: DatabaseRules) -> Result<()> {
let location = object_store_path_for_database_config(&self.root_path()?, &rules.name);
let mut data = BytesMut::new();
rules.encode(&mut data).context(ErrorSerializing)?;
let len = data.len();
let location = object_store_path_for_database_config(&self.root_path()?, db_name);
let stream_data = std::io::Result::Ok(data.freeze());
self.store
@ -322,12 +309,9 @@ impl<M: ConnectionManager> Server<M> {
Err(e) => {
error!("error parsing database config {:?} from store: {}", path, e)
}
Ok(rules) => match DatabaseName::new(rules.name.clone()) {
Err(e) => error!("error parsing name {} from rules: {}", rules.name, e),
Ok(name) => match config.create_db(name, rules) {
Err(e) => error!("error adding database to config: {}", e),
Ok(handle) => handle.commit(),
},
Ok(rules) => match config.create_db(rules) {
Err(e) => error!("error adding database to config: {}", e),
Ok(handle) => handle.commit(),
},
}
})
@ -556,7 +540,8 @@ where
let db = match self.db(&db_name) {
Some(db) => db,
None => {
self.create_database(name, DatabaseRules::new()).await?;
self.create_database(DatabaseRules::new(db_name.clone()))
.await?;
self.db(&db_name).expect("db not inserted")
}
};
@ -678,8 +663,8 @@ mod tests {
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let server = Server::new(manager, store);
let rules = DatabaseRules::new();
let resp = server.create_database("foo", rules).await.unwrap_err();
let rules = DatabaseRules::new(DatabaseName::new("foo").unwrap());
let resp = server.create_database(rules).await.unwrap_err();
assert!(matches!(resp, Error::IdNotSet));
let lines = parsed_lines("cpu foo=1 10");
@ -696,24 +681,26 @@ mod tests {
let server = Server::new(manager, Arc::clone(&store));
server.set_id(1);
let name = "bananas";
let name = DatabaseName::new("bananas").unwrap();
let rules = DatabaseRules {
name: name.clone(),
partition_template: PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("YYYY-MM".to_string())],
},
name: name.to_string(),
..Default::default()
wal_buffer_config: None,
lifecycle_rules: Default::default(),
shard_config: None,
};
// Create a database
server
.create_database(name, rules.clone())
.create_database(rules.clone())
.await
.expect("failed to create database");
let mut rules_path = server.store.new_path();
rules_path.push_all_dirs(&["1", name]);
rules_path.push_all_dirs(&["1", name.as_str()]);
rules_path.set_file_name("rules.pb");
let read_data = server
@ -731,9 +718,9 @@ mod tests {
assert_eq!(rules, read_rules);
let db2 = "db_awesome";
let db2 = DatabaseName::new("db_awesome").unwrap();
server
.create_database(db2, DatabaseRules::new())
.create_database(DatabaseRules::new(db2.clone()))
.await
.expect("failed to create 2nd db");
@ -744,8 +731,8 @@ mod tests {
server2.set_id(1);
server2.load_database_configs().await.unwrap();
let _ = server2.db(&DatabaseName::new(db2).unwrap()).unwrap();
let _ = server2.db(&DatabaseName::new(name).unwrap()).unwrap();
let _ = server2.db(&db2).unwrap();
let _ = server2.db(&name).unwrap();
}
#[tokio::test]
@ -757,17 +744,17 @@ mod tests {
let server = Server::new(manager, store);
server.set_id(1);
let name = "bananas";
let name = DatabaseName::new("bananas").unwrap();
// Create a database
server
.create_database(name, DatabaseRules::new())
.create_database(DatabaseRules::new(name.clone()))
.await
.expect("failed to create database");
// Then try and create another with the same name
let got = server
.create_database(name, DatabaseRules::new())
.create_database(DatabaseRules::new(name.clone()))
.await
.unwrap_err();
@ -788,8 +775,9 @@ mod tests {
let names = vec!["bar", "baz"];
for name in &names {
let name = DatabaseName::new(name.to_string()).unwrap();
server
.create_database(*name, DatabaseRules::new())
.create_database(DatabaseRules::new(name))
.await
.expect("failed to create database");
}
@ -800,39 +788,15 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn database_name_validation() -> Result {
let manager = TestConnectionManager::new();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let server = Server::new(manager, store);
server.set_id(1);
let reject = vec![
"bananas\t",
"bananas\"are\u{0099}\"great",
"bananas\nfoster",
];
for name in reject {
let got = server
.create_database(name, DatabaseRules::new())
.await
.unwrap_err();
if !matches!(got, Error::InvalidDatabaseName { .. }) {
panic!("expected invalid name error");
}
}
Ok(())
}
#[tokio::test]
async fn writes_local() -> Result {
let manager = TestConnectionManager::new();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let server = Server::new(manager, store);
server.set_id(1);
server.create_database("foo", DatabaseRules::new()).await?;
server
.create_database(DatabaseRules::new(DatabaseName::new("foo").unwrap()))
.await?;
let line = "cpu bar=1 10";
let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect();
@ -875,7 +839,7 @@ mod tests {
let db_name = DatabaseName::new("foo").unwrap();
server
.create_database(db_name.as_str(), DatabaseRules::new())
.create_database(DatabaseRules::new(db_name.clone()))
.await?;
let line = "cpu bar=1 10";
@ -932,8 +896,10 @@ mod tests {
let server = Server::new(manager, Arc::clone(&store));
server.set_id(1);
let db_name = "my_db";
let db_name = DatabaseName::new("my_db").unwrap();
let rules = DatabaseRules {
name: db_name.clone(),
partition_template: Default::default(),
wal_buffer_config: Some(WalBufferConfig {
buffer_size: 500,
segment_size: 10,
@ -941,12 +907,13 @@ mod tests {
store_segments: true,
close_segment_after: None,
}),
..Default::default()
lifecycle_rules: Default::default(),
shard_config: None,
};
server.create_database(db_name, rules).await.unwrap();
server.create_database(rules).await.unwrap();
let lines = parsed_lines("disk,host=a used=10.1 12");
server.write_lines(db_name, &lines).await.unwrap();
server.write_lines(db_name.as_str(), &lines).await.unwrap();
// write lines should have caused a segment rollover and persist, wait
tokio::task::yield_now().await;

View File

@ -1,6 +1,7 @@
use data_types::{
chunk::{ChunkStorage, ChunkSummary},
database_rules::DatabaseRules,
DatabaseName,
};
use query::Database;
@ -10,7 +11,7 @@ use std::sync::Arc;
/// Used for testing: create a Database with a local store
pub fn make_db() -> Db {
Db::new(
DatabaseRules::new(),
DatabaseRules::new(DatabaseName::new("placeholder").unwrap()),
read_buffer::Database::new(),
None, // wal buffer
Arc::new(JobRegistry::new()),

View File

@ -276,6 +276,7 @@ mod tests {
use super::*;
use data_types::database_rules::DatabaseRules;
use data_types::DatabaseName;
use futures::TryStreamExt;
use mutable_buffer::chunk::Chunk as ChunkWB;
use object_store::memory::InMemory;
@ -389,7 +390,7 @@ mem,host=A,region=west used=45 1
/// Create a Database with a local store
pub fn make_db() -> Db {
Db::new(
DatabaseRules::new(),
DatabaseRules::new(DatabaseName::new("placeholder").unwrap()),
ReadBufferDb::new(),
None, // wal buffer
Arc::new(JobRegistry::new()),

View File

@ -755,7 +755,9 @@ mod tests {
));
test_storage.set_id(1);
test_storage
.create_database("MyOrg_MyBucket", DatabaseRules::new())
.create_database(DatabaseRules::new(
DatabaseName::new("MyOrg_MyBucket").unwrap(),
))
.await
.unwrap();
let server_url = test_server(Arc::clone(&test_storage));
@ -806,7 +808,9 @@ mod tests {
));
test_storage.set_id(1);
test_storage
.create_database("MyOrg_MyBucket", DatabaseRules::new())
.create_database(DatabaseRules::new(
DatabaseName::new("MyOrg_MyBucket").unwrap(),
))
.await
.unwrap();
let server_url = test_server(Arc::clone(&test_storage));
@ -945,7 +949,9 @@ mod tests {
));
test_storage.set_id(1);
test_storage
.create_database("MyOrg_MyBucket", DatabaseRules::new())
.create_database(DatabaseRules::new(
DatabaseName::new("MyOrg_MyBucket").unwrap(),
))
.await
.unwrap();
let server_url = test_server(Arc::clone(&test_storage));
@ -995,7 +1001,9 @@ mod tests {
));
test_storage.set_id(1);
test_storage
.create_database("MyOrg_MyBucket", DatabaseRules::new())
.create_database(DatabaseRules::new(
DatabaseName::new("MyOrg_MyBucket").unwrap(),
))
.await
.unwrap();
let server_url = test_server(Arc::clone(&test_storage));
@ -1032,7 +1040,8 @@ mod tests {
let database_name = "foo_bar";
let rules = DatabaseRules {
name: database_name.to_owned(),
name: DatabaseName::new(database_name).unwrap(),
partition_template: Default::default(),
wal_buffer_config: Some(WalBufferConfig {
buffer_size: 500,
segment_size: 10,
@ -1040,10 +1049,11 @@ mod tests {
store_segments: true,
close_segment_after: None,
}),
..Default::default()
lifecycle_rules: Default::default(),
shard_config: None,
};
server.create_database(database_name, rules).await.unwrap();
server.create_database(rules).await.unwrap();
let base_url = format!(
"{}/iox/api/v1/databases/{}/wal/meta",

View File

@ -79,10 +79,7 @@ where
.and_then(TryInto::try_into)
.map_err(|e| e.scope("rules"))?;
let name =
DatabaseName::new(rules.name.clone()).expect("protobuf mapping didn't validate name");
match self.server.create_database(name, rules).await {
match self.server.create_database(rules).await {
Ok(_) => Ok(Response::new(CreateDatabaseResponse {})),
Err(Error::DatabaseAlreadyExists { db_name }) => {
return Err(AlreadyExists {