diff --git a/data_types/src/http.rs b/data_types/src/http.rs index 2f595fb82a..904c2dc129 100644 --- a/data_types/src/http.rs +++ b/data_types/src/http.rs @@ -18,9 +18,3 @@ pub struct WalMetadataQuery { pub struct WalMetadataResponse { pub segments: Vec, } - -#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] -/// Body of the response to the /databases endpoint. -pub struct ListDatabasesResponse { - pub names: Vec, -} diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index 8e057f98e4..10edba3523 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -13,8 +13,7 @@ // Influx crates use arrow_deps::datafusion::physical_plan::collect; use data_types::{ - database_rules::DatabaseRules, - http::{ListDatabasesResponse, WalMetadataQuery}, + http::WalMetadataQuery, names::{org_and_bucket_to_database, OrgBucketMappingError}, DatabaseName, }; @@ -30,7 +29,7 @@ use futures::{self, StreamExt}; use http::header::{CONTENT_ENCODING, CONTENT_TYPE}; use hyper::{Body, Method, Request, Response, StatusCode}; use routerify::{prelude::*, Middleware, RequestInfo, Router, RouterError, RouterService}; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use snafu::{OptionExt, ResultExt, Snafu}; use tracing::{debug, error, info}; @@ -314,15 +313,9 @@ where Ok(res) })) // this endpoint is for API backward compatibility with InfluxDB 2.x .post("/api/v2/write", write::) - .get("/ping", ping) .get("/health", health) - .get("/iox/api/v1/databases", list_databases::) - .put("/iox/api/v1/databases/:name", create_database::) - .get("/iox/api/v1/databases/:name", get_database::) .get("/iox/api/v1/databases/:name/query", query::) .get("/iox/api/v1/databases/:name/wal/meta", get_wal_meta::) - .put("/iox/api/v1/id", set_writer::) - .get("/iox/api/v1/id", get_writer::) .get("/api/v1/partitions", list_partitions::) .post("/api/v1/snapshot", snapshot_partition::) // Specify the error handler to handle any errors caused by @@ -532,69 +525,6 @@ async fn query( Ok(response) } -#[tracing::instrument(level = "debug")] -async fn list_databases(req: Request) -> Result, ApplicationError> -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ - let server = Arc::clone(&req.data::>>().expect("server state")); - - let names = server.db_names_sorted().await; - let json = serde_json::to_string(&ListDatabasesResponse { names }) - .context(InternalSerializationError)?; - Ok(Response::new(Body::from(json))) -} - -#[tracing::instrument(level = "debug")] -async fn create_database( - req: Request, -) -> Result, ApplicationError> { - let server = Arc::clone(&req.data::>>().expect("server state")); - - // with routerify, we shouldn't have gotten here without this being set - let db_name = req - .param("name") - .expect("db name must have been set") - .clone(); - let body = parse_body(req).await?; - - let rules: DatabaseRules = serde_json::from_slice(body.as_ref()).context(InvalidRequestBody)?; - - server - .create_database(db_name, rules) - .await - .context(ErrorCreatingDatabase)?; - - Ok(Response::new(Body::empty())) -} - -#[tracing::instrument(level = "debug")] -async fn get_database( - req: Request, -) -> Result, ApplicationError> { - let server = Arc::clone(&req.data::>>().expect("server state")); - - // with routerify, we shouldn't have gotten here without this being set - let db_name_str = req - .param("name") - .expect("db name must have been set") - .clone(); - let db_name = DatabaseName::new(&db_name_str).context(DatabaseNameError)?; - let db = server - .db_rules(&db_name) - .await - .context(DatabaseNotFound { name: &db_name_str })?; - - let data = serde_json::to_string(&db).context(JsonGenerationError)?; - let response = Response::builder() - .header("Content-Type", "application/json") - .status(StatusCode::OK) - .body(Body::from(data)) - .expect("builder should be successful"); - - Ok(response) -} - #[tracing::instrument(level = "debug")] async fn get_wal_meta( req: Request, @@ -652,73 +582,6 @@ async fn get_wal_meta( Ok(response) } -#[tracing::instrument(level = "debug")] -async fn set_writer( - req: Request, -) -> Result, ApplicationError> { - let server = Arc::clone(&req.data::>>().expect("server state")); - - // Read the request body - let body = parse_body(req).await?; - - // Parse the JSON body into a structure - #[derive(Serialize, Deserialize)] - struct WriterIdBody { - id: u32, - } - let req: WriterIdBody = serde_json::from_slice(body.as_ref()).context(InvalidRequestBody)?; - - // Set the writer ID - server.set_id(req.id); - - // Build a HTTP 200 response - let response = Response::builder() - .status(StatusCode::OK) - .body(Body::from( - serde_json::to_string(&req).expect("json encoding should not fail"), - )) - .expect("builder should be successful"); - - Ok(response) -} - -#[tracing::instrument(level = "debug")] -async fn get_writer( - req: Request, -) -> Result, ApplicationError> { - let id = { - let server = Arc::clone(&req.data::>>().expect("server state")); - server.require_id() - }; - - // Parse the JSON body into a structure - #[derive(Serialize)] - struct WriterIdBody { - id: u32, - } - - let body = WriterIdBody { - id: id.unwrap_or(0), - }; - - // Build a HTTP 200 response - let response = Response::builder() - .status(StatusCode::OK) - .body(Body::from( - serde_json::to_string(&body).expect("json encoding should not fail"), - )) - .expect("builder should be successful"); - - Ok(response) -} - -// Route to test that the server is alive -#[tracing::instrument(level = "debug")] -async fn ping(_: Request) -> Result, ApplicationError> { - let response_body = "PONG"; - Ok(Response::new(Body::from(response_body.to_string()))) -} - #[tracing::instrument(level = "debug")] async fn health(_: Request) -> Result, ApplicationError> { let response_body = "OK"; @@ -849,22 +712,6 @@ mod tests { type Error = Box; type Result = std::result::Result; - #[tokio::test] - async fn test_ping() -> Result<()> { - let test_storage = Arc::new(AppServer::new( - ConnectionManagerImpl {}, - Arc::new(ObjectStore::new_in_memory(InMemory::new())), - )); - let server_url = test_server(Arc::clone(&test_storage)); - - let client = Client::new(); - let response = client.get(&format!("{}/ping", server_url)).send().await; - - // Print the response so if the test fails, we have a log of what went wrong - check_response("ping", response, StatusCode::OK, "PONG").await; - Ok(()) - } - #[tokio::test] async fn test_health() -> Result<()> { let test_storage = Arc::new(AppServer::new( @@ -1123,133 +970,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn set_writer_id() { - let server = Arc::new(AppServer::new( - ConnectionManagerImpl {}, - Arc::new(ObjectStore::new_in_memory(InMemory::new())), - )); - server.set_id(1); - let server_url = test_server(Arc::clone(&server)); - - let data = r#"{"id":42}"#; - - let client = Client::new(); - - let response = client - .put(&format!("{}/iox/api/v1/id", server_url)) - .body(data) - .send() - .await; - - check_response("set_writer_id", response, StatusCode::OK, data).await; - - assert_eq!(server.require_id().expect("should be set"), 42); - - // Check get_writer_id - let response = client - .get(&format!("{}/iox/api/v1/id", server_url)) - .send() - .await; - - check_response("get_writer_id", response, StatusCode::OK, data).await; - } - - #[tokio::test] - async fn list_databases() { - let server = Arc::new(AppServer::new( - ConnectionManagerImpl {}, - Arc::new(ObjectStore::new_in_memory(InMemory::new())), - )); - server.set_id(1); - let server_url = test_server(Arc::clone(&server)); - - let database_names: Vec = vec!["foo_bar", "foo_baz"] - .iter() - .map(|i| i.to_string()) - .collect(); - - for database_name in &database_names { - let rules = DatabaseRules { - name: database_name.clone(), - ..Default::default() - }; - server.create_database(database_name, rules).await.unwrap(); - } - - let client = Client::new(); - let response = client - .get(&format!("{}/iox/api/v1/databases", server_url)) - .send() - .await; - - let data = serde_json::to_string(&ListDatabasesResponse { - names: database_names, - }) - .unwrap(); - check_response("list_databases", response, StatusCode::OK, &data).await; - } - - #[tokio::test] - async fn create_database() { - let server = Arc::new(AppServer::new( - ConnectionManagerImpl {}, - Arc::new(ObjectStore::new_in_memory(InMemory::new())), - )); - server.set_id(1); - let server_url = test_server(Arc::clone(&server)); - - let data = r#"{}"#; - - let database_name = DatabaseName::new("foo_bar").unwrap(); - - let client = Client::new(); - let response = client - .put(&format!( - "{}/iox/api/v1/databases/{}", - server_url, database_name - )) - .body(data) - .send() - .await; - - check_response("create_database", response, StatusCode::OK, "").await; - - server.db(&database_name).await.unwrap(); - let db_rules = server.db_rules(&database_name).await.unwrap(); - assert!(db_rules.mutable_buffer_config.is_some()); - } - - #[tokio::test] - async fn get_database() { - let server = Arc::new(AppServer::new( - ConnectionManagerImpl {}, - Arc::new(ObjectStore::new_in_memory(InMemory::new())), - )); - server.set_id(1); - let server_url = test_server(Arc::clone(&server)); - - let database_name = "foo_bar"; - let rules = DatabaseRules { - name: database_name.to_owned(), - ..Default::default() - }; - let data = serde_json::to_string(&rules).unwrap(); - - server.create_database(database_name, rules).await.unwrap(); - - let client = Client::new(); - let response = client - .get(&format!( - "{}/iox/api/v1/databases/{}", - server_url, database_name - )) - .send() - .await; - - check_response("get_database", response, StatusCode::OK, &data).await; - } - #[tokio::test] async fn get_wal_meta() { let server = Arc::new(AppServer::new(