parent
42de58497e
commit
202a4187a1
|
@ -169,6 +169,9 @@ pub trait DatabaseStore: Debug + Send + Sync {
|
|||
/// The type of error this DataBase store generates
|
||||
type Error: std::error::Error + Send + Sync + 'static;
|
||||
|
||||
/// List the database names.
|
||||
async fn db_names_sorted(&self) -> Vec<String>;
|
||||
|
||||
/// Retrieve the database specified by `name` returning None if no
|
||||
/// such database exists
|
||||
async fn db(&self, name: &str) -> Option<Arc<Self::Database>>;
|
||||
|
|
|
@ -542,6 +542,14 @@ impl Default for TestDatabaseStore {
|
|||
impl DatabaseStore for TestDatabaseStore {
|
||||
type Database = TestDatabase;
|
||||
type Error = TestError;
|
||||
|
||||
/// List the database names.
|
||||
async fn db_names_sorted(&self) -> Vec<String> {
|
||||
let databases = self.databases.lock().await;
|
||||
|
||||
databases.keys().cloned().collect()
|
||||
}
|
||||
|
||||
/// Retrieve the database specified name
|
||||
async fn db(&self, name: &str) -> Option<Arc<Self::Database>> {
|
||||
let databases = self.databases.lock().await;
|
||||
|
|
|
@ -68,10 +68,15 @@ impl Config {
|
|||
}
|
||||
|
||||
pub(crate) fn host_group(&self, host_group_id: &str) -> Option<Arc<HostGroup>> {
|
||||
let state = self.state.read().expect("mutex poinsoned");
|
||||
let state = self.state.read().expect("mutex poisoned");
|
||||
state.host_groups.get(host_group_id).cloned()
|
||||
}
|
||||
|
||||
pub(crate) fn db_names_sorted(&self) -> Vec<DatabaseName<'static>> {
|
||||
let state = self.state.read().expect("mutex poisoned");
|
||||
state.databases.keys().cloned().collect()
|
||||
}
|
||||
|
||||
fn commit(&self, name: &DatabaseName<'static>, db: Arc<Db>) {
|
||||
let mut state = self.state.write().expect("mutex poisoned");
|
||||
let name = state
|
||||
|
@ -153,6 +158,8 @@ mod test {
|
|||
let db_reservation = config.create_db(name.clone(), rules).unwrap();
|
||||
db_reservation.commit();
|
||||
assert!(config.db(&name).is_some());
|
||||
|
||||
assert_eq!(config.db_names_sorted(), vec![name]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -432,6 +432,14 @@ where
|
|||
type Database = Db;
|
||||
type Error = Error;
|
||||
|
||||
async fn db_names_sorted(&self) -> Vec<String> {
|
||||
self.config
|
||||
.db_names_sorted()
|
||||
.iter()
|
||||
.map(|i| i.clone().into())
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn db(&self, name: &str) -> Option<Arc<Self::Database>> {
|
||||
if let Ok(name) = DatabaseName::new(name) {
|
||||
return self.db(&name).await;
|
||||
|
@ -676,6 +684,28 @@ mod tests {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn db_names_sorted() -> Result {
|
||||
let manager = TestConnectionManager::new();
|
||||
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
|
||||
let server = Server::new(manager, store);
|
||||
server.set_id(1);
|
||||
|
||||
let names = vec!["bar", "baz"];
|
||||
|
||||
for name in &names {
|
||||
server
|
||||
.create_database(*name, DatabaseRules::default())
|
||||
.await
|
||||
.expect("failed to create database");
|
||||
}
|
||||
|
||||
let db_names_sorted = server.db_names_sorted().await;
|
||||
assert_eq!(names, db_names_sorted);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn database_name_validation() -> Result {
|
||||
let manager = TestConnectionManager::new();
|
||||
|
|
|
@ -98,6 +98,9 @@ pub enum ApplicationError {
|
|||
#[snafu(display("Invalid request body: {}", source))]
|
||||
InvalidRequestBody { source: serde_json::error::Error },
|
||||
|
||||
#[snafu(display("Invalid response body: {}", source))]
|
||||
InternalSerializationError { source: serde_json::error::Error },
|
||||
|
||||
#[snafu(display("Invalid content encoding: {}", content_encoding))]
|
||||
InvalidContentEncoding { content_encoding: String },
|
||||
|
||||
|
@ -159,6 +162,7 @@ impl ApplicationError {
|
|||
Self::ExpectedQueryString { .. } => self.bad_request(),
|
||||
Self::InvalidQueryString { .. } => self.bad_request(),
|
||||
Self::InvalidRequestBody { .. } => self.bad_request(),
|
||||
Self::InternalSerializationError { .. } => self.internal_error(),
|
||||
Self::InvalidContentEncoding { .. } => self.bad_request(),
|
||||
Self::ReadingHeaderAsUtf8 { .. } => self.bad_request(),
|
||||
Self::ReadingBody { .. } => self.bad_request(),
|
||||
|
@ -250,6 +254,7 @@ where
|
|||
.post("/api/v2/write", write::<M>)
|
||||
.get("/ping", ping)
|
||||
.get("/api/v2/read", read::<M>)
|
||||
.get("/iox/api/v1/databases", list_databases::<M>)
|
||||
.put("/iox/api/v1/databases/:name", create_database::<M>)
|
||||
.get("/iox/api/v1/databases/:name", get_database::<M>)
|
||||
.put("/iox/api/v1/id", set_writer::<M>)
|
||||
|
@ -448,6 +453,28 @@ async fn read<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
|||
Ok(Response::new(Body::from(results.into_bytes())))
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
/// Body of the response to the /databases endpoint.
|
||||
struct ListDatabasesResponse {
|
||||
names: Vec<String>,
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug")]
|
||||
async fn list_databases<M>(req: Request<Body>) -> Result<Response<Body>, ApplicationError>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
let server = req
|
||||
.data::<Arc<AppServer<M>>>()
|
||||
.expect("server state")
|
||||
.clone();
|
||||
|
||||
let names = server.db_names_sorted().await;
|
||||
let json = serde_json::to_string(&ListDatabasesResponse { names })
|
||||
.context(InternalSerializationError)?;
|
||||
Ok(Response::new(Body::from(json)))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug")]
|
||||
async fn create_database<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
||||
req: Request<Body>,
|
||||
|
@ -828,6 +855,42 @@ mod tests {
|
|||
assert_eq!(server.require_id().expect("should be set"), 42);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_databases() {
|
||||
let server = Arc::new(AppServer::new(
|
||||
ConnectionManagerImpl {},
|
||||
Arc::new(ObjectStore::new_in_memory(InMemory::new())),
|
||||
));
|
||||
server.set_id(1);
|
||||
let server_url = test_server(server.clone());
|
||||
|
||||
let database_names: Vec<String> = vec!["foo_bar", "foo_baz"]
|
||||
.iter()
|
||||
.map(|i| i.to_string())
|
||||
.collect();
|
||||
|
||||
for database_name in &database_names {
|
||||
let rules = DatabaseRules {
|
||||
name: database_name.clone(),
|
||||
store_locally: true,
|
||||
..Default::default()
|
||||
};
|
||||
server.create_database(database_name, rules).await.unwrap();
|
||||
}
|
||||
|
||||
let client = Client::new();
|
||||
let response = client
|
||||
.get(&format!("{}/iox/api/v1/databases", server_url))
|
||||
.send()
|
||||
.await;
|
||||
|
||||
let data = serde_json::to_string(&ListDatabasesResponse {
|
||||
names: database_names,
|
||||
})
|
||||
.unwrap();
|
||||
check_response("list_databases", response, StatusCode::OK, &data).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn create_database() {
|
||||
let server = Arc::new(AppServer::new(
|
||||
|
|
Loading…
Reference in New Issue