feat: add loading of database rules on startup

This adds functionality to the server to load database rules on startup. Follow on work will update the rules to store additional data (the catalog) and ensure that updates to the catalog can occur as outlined in #651. This work also updated the configuration to not require a database directory so the server can run entirely in memory. I needed this to get the end-to-end test passing since the file object store API doesn't yet have the functionality needed. I've logged #688 to track adding that in.
pull/24376/head
Paul Dix 2021-01-21 18:25:34 -05:00
parent 124d603745
commit 45d25fcbcd
7 changed files with 134 additions and 14 deletions

View File

@ -19,6 +19,10 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// subscribers, and querying data for a single database.
#[derive(Debug, Serialize, Deserialize, Default, Eq, PartialEq, Clone)]
pub struct DatabaseRules {
/// The unencoded name of the database. This gets put in by the create
/// database call, so an empty default is fine.
#[serde(default)]
pub name: String,
/// Template that generates a partition key for each row inserted into the
/// db
#[serde(default)]

View File

@ -13,7 +13,7 @@ use std::{
sync::{Arc, RwLock},
};
const DB_RULES_FILE_NAME: &str = "rules.json";
pub(crate) const DB_RULES_FILE_NAME: &str = "rules.json";
/// The Config tracks the configuration od databases and their rules along
/// with host groups for replication. It is used as an in-memory structure

View File

@ -76,7 +76,10 @@ use std::sync::{
Arc,
};
use crate::{config::Config, db::Db};
use crate::{
config::{object_store_path_for_database_config, Config, DB_RULES_FILE_NAME},
db::Db,
};
use data_types::{
data::{lines_to_replicated_write, ReplicatedWrite},
database_rules::{DatabaseRules, HostGroup, HostGroupId, MatchTables},
@ -86,9 +89,9 @@ use influxdb_line_protocol::ParsedLine;
use object_store::{path::ObjectStorePath, ObjectStore};
use query::{exec::Executor, Database, DatabaseStore};
use crate::config::object_store_path_for_database_config;
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::TryStreamExt;
use snafu::{OptionExt, ResultExt, Snafu};
use tracing::{error, info};
@ -142,7 +145,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct Server<M: ConnectionManager> {
id: AtomicU32,
config: Config,
config: Arc<Config>,
connection_manager: Arc<M>,
pub store: Arc<ObjectStore>,
executor: Arc<Executor>,
@ -152,7 +155,7 @@ impl<M: ConnectionManager> Server<M> {
pub fn new(connection_manager: M, store: Arc<ObjectStore>) -> Self {
Self {
id: AtomicU32::new(SERVER_ID_NOT_SET),
config: Config::default(),
config: Arc::new(Config::default()),
store,
connection_manager: Arc::new(connection_manager),
executor: Arc::new(Executor::new()),
@ -185,7 +188,10 @@ impl<M: ConnectionManager> Server<M> {
// Return an error if this server hasn't yet been setup with an id
let id = self.require_id()?;
let db_name = DatabaseName::new(db_name.into()).context(InvalidDatabaseName)?;
let name = db_name.into();
let db_name = DatabaseName::new(name.clone()).context(InvalidDatabaseName)?;
let mut rules = rules;
rules.name = name;
let db_reservation = self.config.create_db(db_name, rules)?;
@ -212,6 +218,68 @@ impl<M: ConnectionManager> Server<M> {
Ok(())
}
/// Loads the database configurations based on the databases in the
/// object store. Any databases in the config already won't be
/// replaced.
pub async fn load_database_configs(&self) -> Result<()> {
let id = self.require_id()?;
let root_path = server_object_store_path(id);
// get the database names from the object store prefixes
// TODO: update object store to pull back all common prefixes by
// following the next tokens.
let list_result = self
.store
.list_with_delimiter(&root_path)
.await
.context(StoreError)?;
let handles: Vec<_> = list_result
.common_prefixes
.into_iter()
.map(|mut path| {
let store = self.store.clone();
let config = self.config.clone();
path.set_file_name(DB_RULES_FILE_NAME);
tokio::task::spawn(async move {
let mut res = get_store_bytes(&path, &store).await;
while let Err(e) = &res {
error!(
"error getting database config {:?} from object store: {}",
path, e
);
tokio::time::delay_for(tokio::time::Duration::from_secs(
STORE_ERROR_PAUSE_SECONDS,
))
.await;
res = get_store_bytes(&path, &store).await;
}
let res = res.unwrap();
match serde_json::from_slice::<DatabaseRules>(&res) {
Err(e) => {
error!("error parsing database config {:?} from store: {}", path, e)
}
Ok(rules) => match DatabaseName::new(rules.name.clone()) {
Err(e) => error!("error parsing name {} from rules: {}", rules.name, e),
Ok(name) => match config.create_db(name, rules) {
Err(e) => error!("error adding database to config: {}", e),
Ok(handle) => handle.commit(),
},
},
}
})
})
.collect();
futures::future::join_all(handles).await;
Ok(())
}
/// Creates a host group with a set of connection strings to hosts. These
/// host connection strings should be something that the connection
/// manager can use to return a remote server to work with.
@ -494,6 +562,23 @@ fn persist_bytes_in_background(data: Bytes, store: Arc<ObjectStore>, location: O
});
}
// get bytes from the location in object store
async fn get_store_bytes(
location: &ObjectStorePath,
store: &Arc<ObjectStore>,
) -> Result<bytes::BytesMut> {
let b = store
.get(&location)
.await
.context(StoreError)?
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.context(StoreError)?;
Ok(b)
}
#[cfg(test)]
mod tests {
use super::*;
@ -551,6 +636,7 @@ mod tests {
partition_template: PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("YYYY-MM".to_string())],
},
name: name.to_string(),
..Default::default()
};
@ -576,6 +662,28 @@ mod tests {
let read_rules = serde_json::from_str::<DatabaseRules>(read_data).unwrap();
assert_eq!(rules, read_rules);
let db2 = "db_awesome";
server
.create_database(db2, DatabaseRules::default())
.await
.expect("failed to create 2nd db");
println!(
"prefixes = {:?}",
store
.list_with_delimiter(&ObjectStorePath::from_cloud_unchecked(""))
.await
.unwrap()
);
let manager = TestConnectionManager::new();
let server2 = Server::new(manager, store);
server2.set_id(1);
server2.load_database_configs().await.unwrap();
let _ = server2.db(&DatabaseName::new(db2).unwrap()).await.unwrap();
let _ = server2.db(&DatabaseName::new(name).unwrap()).await.unwrap();
}
#[tokio::test]

View File

@ -90,8 +90,8 @@ pub struct Config {
pub grpc_bind_address: SocketAddr,
/// The location InfluxDB IOx will use to store files locally.
#[structopt(long = "--data-dir", env = "INFLUXDB_IOX_DB_DIR", default_value = &DEFAULT_DATA_DIR)]
pub database_directory: PathBuf,
#[structopt(long = "--data-dir", env = "INFLUXDB_IOX_DB_DIR")]
pub database_directory: Option<PathBuf>,
/// If using Google Cloud Storage for the object store, this item, as well
/// as SERVICE_ACCOUNT must be set.

View File

@ -1,4 +1,4 @@
use tracing::{info, warn};
use tracing::{error, info, warn};
use std::fs;
use std::net::SocketAddr;
@ -97,14 +97,16 @@ pub async fn main(logging_level: LoggingLevel, config: Option<Config>) -> Result
let db_dir = &config.database_directory;
fs::create_dir_all(db_dir).context(CreatingDatabaseDirectory { path: db_dir })?;
let object_store = if let Some(bucket_name) = &config.gcp_bucket {
info!("Using GCP bucket {} for storage", bucket_name);
ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(bucket_name))
} else {
} else if let Some(db_dir) = db_dir {
info!("Using local dir {:?} for storage", db_dir);
fs::create_dir_all(db_dir).context(CreatingDatabaseDirectory { path: db_dir })?;
ObjectStore::new_file(object_store::disk::File::new(&db_dir))
} else {
warn!("NO PERSISTENCE: using memory for object storage");
ObjectStore::new_in_memory(object_store::memory::InMemory::new())
};
let object_storage = Arc::new(object_store);
@ -115,6 +117,12 @@ pub async fn main(logging_level: LoggingLevel, config: Option<Config>) -> Result
// call
if let Some(id) = config.writer_id {
app_server.set_id(id);
if let Err(e) = app_server.load_database_configs().await {
error!(
"unable to load database configurations from object storage: {}",
e
)
}
} else {
warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data.");
}

View File

@ -959,13 +959,14 @@ mod tests {
server.set_id(1);
let server_url = test_server(server.clone());
let database_name = "foo_bar";
let rules = DatabaseRules {
name: database_name.to_string(),
store_locally: true,
..Default::default()
};
let data = serde_json::to_string(&rules).unwrap();
let database_name = "foo_bar";
server.create_database(database_name, rules).await.unwrap();
let client = Client::new();

View File

@ -893,7 +893,6 @@ impl TestServer {
.unwrap()
// Can enable for debbugging
//.arg("-vv")
.env("INFLUXDB_IOX_DB_DIR", dir.path())
.env("INFLUXDB_IOX_ID", "1")
.spawn()
.unwrap();