Merge pull request #689 from influxdata/pd-load-database-configs

pull/24376/head
Carol (Nichols || Goulding) 2021-01-25 10:02:02 -05:00 committed by GitHub
commit b7663970f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 131 additions and 15 deletions

View File

@ -19,6 +19,10 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// subscribers, and querying data for a single database.
#[derive(Debug, Serialize, Deserialize, Default, Eq, PartialEq, Clone)]
pub struct DatabaseRules {
/// The unencoded name of the database. This gets put in by the create
/// database call, so an empty default is fine.
#[serde(default)]
pub name: String,
/// Template that generates a partition key for each row inserted into the
/// db
#[serde(default)]

View File

@ -13,7 +13,7 @@ use std::{
sync::{Arc, RwLock},
};
const DB_RULES_FILE_NAME: &str = "rules.json";
pub(crate) const DB_RULES_FILE_NAME: &str = "rules.json";
/// The Config tracks the configuration od databases and their rules along
/// with host groups for replication. It is used as an in-memory structure

View File

@ -76,7 +76,10 @@ use std::sync::{
Arc,
};
use crate::{config::Config, db::Db};
use crate::{
config::{object_store_path_for_database_config, Config, DB_RULES_FILE_NAME},
db::Db,
};
use data_types::{
data::{lines_to_replicated_write, ReplicatedWrite},
database_rules::{DatabaseRules, HostGroup, HostGroupId, MatchTables},
@ -86,9 +89,9 @@ use influxdb_line_protocol::ParsedLine;
use object_store::{path::ObjectStorePath, ObjectStore};
use query::{exec::Executor, Database, DatabaseStore};
use crate::config::object_store_path_for_database_config;
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::TryStreamExt;
use snafu::{OptionExt, ResultExt, Snafu};
use tracing::{error, info};
@ -142,7 +145,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct Server<M: ConnectionManager> {
id: AtomicU32,
config: Config,
config: Arc<Config>,
connection_manager: Arc<M>,
pub store: Arc<ObjectStore>,
executor: Arc<Executor>,
@ -152,7 +155,7 @@ impl<M: ConnectionManager> Server<M> {
pub fn new(connection_manager: M, store: Arc<ObjectStore>) -> Self {
Self {
id: AtomicU32::new(SERVER_ID_NOT_SET),
config: Config::default(),
config: Arc::new(Config::default()),
store,
connection_manager: Arc::new(connection_manager),
executor: Arc::new(Executor::new()),
@ -180,12 +183,14 @@ impl<M: ConnectionManager> Server<M> {
pub async fn create_database(
&self,
db_name: impl Into<String>,
rules: DatabaseRules,
mut rules: DatabaseRules,
) -> Result<()> {
// Return an error if this server hasn't yet been setup with an id
let id = self.require_id()?;
let db_name = DatabaseName::new(db_name.into()).context(InvalidDatabaseName)?;
let name = db_name.into();
let db_name = DatabaseName::new(name.clone()).context(InvalidDatabaseName)?;
rules.name = name;
let db_reservation = self.config.create_db(db_name, rules)?;
@ -212,6 +217,68 @@ impl<M: ConnectionManager> Server<M> {
Ok(())
}
/// Loads the database configurations based on the databases in the
/// object store. Any databases in the config already won't be
/// replaced.
pub async fn load_database_configs(&self) -> Result<()> {
let id = self.require_id()?;
let root_path = server_object_store_path(id);
// get the database names from the object store prefixes
// TODO: update object store to pull back all common prefixes by
// following the next tokens.
let list_result = self
.store
.list_with_delimiter(&root_path)
.await
.context(StoreError)?;
let handles: Vec<_> = list_result
.common_prefixes
.into_iter()
.map(|mut path| {
let store = self.store.clone();
let config = self.config.clone();
path.set_file_name(DB_RULES_FILE_NAME);
tokio::task::spawn(async move {
let mut res = get_store_bytes(&path, &store).await;
while let Err(e) = &res {
error!(
"error getting database config {:?} from object store: {}",
path, e
);
tokio::time::delay_for(tokio::time::Duration::from_secs(
STORE_ERROR_PAUSE_SECONDS,
))
.await;
res = get_store_bytes(&path, &store).await;
}
let res = res.unwrap();
match serde_json::from_slice::<DatabaseRules>(&res) {
Err(e) => {
error!("error parsing database config {:?} from store: {}", path, e)
}
Ok(rules) => match DatabaseName::new(rules.name.clone()) {
Err(e) => error!("error parsing name {} from rules: {}", rules.name, e),
Ok(name) => match config.create_db(name, rules) {
Err(e) => error!("error adding database to config: {}", e),
Ok(handle) => handle.commit(),
},
},
}
})
})
.collect();
futures::future::join_all(handles).await;
Ok(())
}
/// Creates a host group with a set of connection strings to hosts. These
/// host connection strings should be something that the connection
/// manager can use to return a remote server to work with.
@ -494,6 +561,23 @@ fn persist_bytes_in_background(data: Bytes, store: Arc<ObjectStore>, location: O
});
}
// get bytes from the location in object store
async fn get_store_bytes(
location: &ObjectStorePath,
store: &ObjectStore,
) -> Result<bytes::BytesMut> {
let b = store
.get(&location)
.await
.context(StoreError)?
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.context(StoreError)?;
Ok(b)
}
#[cfg(test)]
mod tests {
use super::*;
@ -551,6 +635,7 @@ mod tests {
partition_template: PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("YYYY-MM".to_string())],
},
name: name.to_string(),
..Default::default()
};
@ -576,6 +661,25 @@ mod tests {
let read_rules = serde_json::from_str::<DatabaseRules>(read_data).unwrap();
assert_eq!(rules, read_rules);
let db2 = "db_awesome";
server
.create_database(db2, DatabaseRules::default())
.await
.expect("failed to create 2nd db");
store
.list_with_delimiter(&ObjectStorePath::from_cloud_unchecked(""))
.await
.unwrap();
let manager = TestConnectionManager::new();
let server2 = Server::new(manager, store);
server2.set_id(1);
server2.load_database_configs().await.unwrap();
let _ = server2.db(&DatabaseName::new(db2).unwrap()).await.unwrap();
let _ = server2.db(&DatabaseName::new(name).unwrap()).await.unwrap();
}
#[tokio::test]

View File

@ -90,8 +90,8 @@ pub struct Config {
pub grpc_bind_address: SocketAddr,
/// The location InfluxDB IOx will use to store files locally.
#[structopt(long = "--data-dir", env = "INFLUXDB_IOX_DB_DIR", default_value = &DEFAULT_DATA_DIR)]
pub database_directory: PathBuf,
#[structopt(long = "--data-dir", env = "INFLUXDB_IOX_DB_DIR")]
pub database_directory: Option<PathBuf>,
/// If using Google Cloud Storage for the object store, this item, as well
/// as SERVICE_ACCOUNT must be set.

View File

@ -1,4 +1,4 @@
use tracing::{info, warn};
use tracing::{error, info, warn};
use std::fs;
use std::net::SocketAddr;
@ -97,14 +97,16 @@ pub async fn main(logging_level: LoggingLevel, config: Option<Config>) -> Result
let db_dir = &config.database_directory;
fs::create_dir_all(db_dir).context(CreatingDatabaseDirectory { path: db_dir })?;
let object_store = if let Some(bucket_name) = &config.gcp_bucket {
info!("Using GCP bucket {} for storage", bucket_name);
ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(bucket_name))
} else {
} else if let Some(db_dir) = db_dir {
info!("Using local dir {:?} for storage", db_dir);
fs::create_dir_all(db_dir).context(CreatingDatabaseDirectory { path: db_dir })?;
ObjectStore::new_file(object_store::disk::File::new(&db_dir))
} else {
warn!("NO PERSISTENCE: using memory for object storage");
ObjectStore::new_in_memory(object_store::memory::InMemory::new())
};
let object_storage = Arc::new(object_store);
@ -115,6 +117,12 @@ pub async fn main(logging_level: LoggingLevel, config: Option<Config>) -> Result
// call
if let Some(id) = config.writer_id {
app_server.set_id(id);
if let Err(e) = app_server.load_database_configs().await {
error!(
"unable to load database configurations from object storage: {}",
e
)
}
} else {
warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data.");
}

View File

@ -959,13 +959,14 @@ mod tests {
server.set_id(1);
let server_url = test_server(server.clone());
let database_name = "foo_bar";
let rules = DatabaseRules {
name: database_name.to_owned(),
store_locally: true,
..Default::default()
};
let data = serde_json::to_string(&rules).unwrap();
let database_name = "foo_bar";
server.create_database(database_name, rules).await.unwrap();
let client = Client::new();

View File

@ -893,7 +893,6 @@ impl TestServer {
.unwrap()
// Can enable for debbugging
//.arg("-vv")
.env("INFLUXDB_IOX_DB_DIR", dir.path())
.env("INFLUXDB_IOX_ID", "1")
.spawn()
.unwrap();