feat: remove now redundant parts of the HTTP API (#931)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
040f056fc6
commit
65972ccdfc
|
@ -18,9 +18,3 @@ pub struct WalMetadataQuery {
|
||||||
pub struct WalMetadataResponse {
|
pub struct WalMetadataResponse {
|
||||||
pub segments: Vec<SegmentSummary>,
|
pub segments: Vec<SegmentSummary>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
|
|
||||||
/// Body of the response to the /databases endpoint.
|
|
||||||
pub struct ListDatabasesResponse {
|
|
||||||
pub names: Vec<String>,
|
|
||||||
}
|
|
||||||
|
|
|
@ -13,8 +13,7 @@
|
||||||
// Influx crates
|
// Influx crates
|
||||||
use arrow_deps::datafusion::physical_plan::collect;
|
use arrow_deps::datafusion::physical_plan::collect;
|
||||||
use data_types::{
|
use data_types::{
|
||||||
database_rules::DatabaseRules,
|
http::WalMetadataQuery,
|
||||||
http::{ListDatabasesResponse, WalMetadataQuery},
|
|
||||||
names::{org_and_bucket_to_database, OrgBucketMappingError},
|
names::{org_and_bucket_to_database, OrgBucketMappingError},
|
||||||
DatabaseName,
|
DatabaseName,
|
||||||
};
|
};
|
||||||
|
@ -30,7 +29,7 @@ use futures::{self, StreamExt};
|
||||||
use http::header::{CONTENT_ENCODING, CONTENT_TYPE};
|
use http::header::{CONTENT_ENCODING, CONTENT_TYPE};
|
||||||
use hyper::{Body, Method, Request, Response, StatusCode};
|
use hyper::{Body, Method, Request, Response, StatusCode};
|
||||||
use routerify::{prelude::*, Middleware, RequestInfo, Router, RouterError, RouterService};
|
use routerify::{prelude::*, Middleware, RequestInfo, Router, RouterError, RouterService};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::Deserialize;
|
||||||
use snafu::{OptionExt, ResultExt, Snafu};
|
use snafu::{OptionExt, ResultExt, Snafu};
|
||||||
use tracing::{debug, error, info};
|
use tracing::{debug, error, info};
|
||||||
|
|
||||||
|
@ -314,15 +313,9 @@ where
|
||||||
Ok(res)
|
Ok(res)
|
||||||
})) // this endpoint is for API backward compatibility with InfluxDB 2.x
|
})) // this endpoint is for API backward compatibility with InfluxDB 2.x
|
||||||
.post("/api/v2/write", write::<M>)
|
.post("/api/v2/write", write::<M>)
|
||||||
.get("/ping", ping)
|
|
||||||
.get("/health", health)
|
.get("/health", health)
|
||||||
.get("/iox/api/v1/databases", list_databases::<M>)
|
|
||||||
.put("/iox/api/v1/databases/:name", create_database::<M>)
|
|
||||||
.get("/iox/api/v1/databases/:name", get_database::<M>)
|
|
||||||
.get("/iox/api/v1/databases/:name/query", query::<M>)
|
.get("/iox/api/v1/databases/:name/query", query::<M>)
|
||||||
.get("/iox/api/v1/databases/:name/wal/meta", get_wal_meta::<M>)
|
.get("/iox/api/v1/databases/:name/wal/meta", get_wal_meta::<M>)
|
||||||
.put("/iox/api/v1/id", set_writer::<M>)
|
|
||||||
.get("/iox/api/v1/id", get_writer::<M>)
|
|
||||||
.get("/api/v1/partitions", list_partitions::<M>)
|
.get("/api/v1/partitions", list_partitions::<M>)
|
||||||
.post("/api/v1/snapshot", snapshot_partition::<M>)
|
.post("/api/v1/snapshot", snapshot_partition::<M>)
|
||||||
// Specify the error handler to handle any errors caused by
|
// Specify the error handler to handle any errors caused by
|
||||||
|
@ -532,69 +525,6 @@ async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "debug")]
|
|
||||||
async fn list_databases<M>(req: Request<Body>) -> Result<Response<Body>, ApplicationError>
|
|
||||||
where
|
|
||||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
|
||||||
{
|
|
||||||
let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state"));
|
|
||||||
|
|
||||||
let names = server.db_names_sorted().await;
|
|
||||||
let json = serde_json::to_string(&ListDatabasesResponse { names })
|
|
||||||
.context(InternalSerializationError)?;
|
|
||||||
Ok(Response::new(Body::from(json)))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(level = "debug")]
|
|
||||||
async fn create_database<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
|
||||||
req: Request<Body>,
|
|
||||||
) -> Result<Response<Body>, ApplicationError> {
|
|
||||||
let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state"));
|
|
||||||
|
|
||||||
// with routerify, we shouldn't have gotten here without this being set
|
|
||||||
let db_name = req
|
|
||||||
.param("name")
|
|
||||||
.expect("db name must have been set")
|
|
||||||
.clone();
|
|
||||||
let body = parse_body(req).await?;
|
|
||||||
|
|
||||||
let rules: DatabaseRules = serde_json::from_slice(body.as_ref()).context(InvalidRequestBody)?;
|
|
||||||
|
|
||||||
server
|
|
||||||
.create_database(db_name, rules)
|
|
||||||
.await
|
|
||||||
.context(ErrorCreatingDatabase)?;
|
|
||||||
|
|
||||||
Ok(Response::new(Body::empty()))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(level = "debug")]
|
|
||||||
async fn get_database<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
|
||||||
req: Request<Body>,
|
|
||||||
) -> Result<Response<Body>, ApplicationError> {
|
|
||||||
let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state"));
|
|
||||||
|
|
||||||
// with routerify, we shouldn't have gotten here without this being set
|
|
||||||
let db_name_str = req
|
|
||||||
.param("name")
|
|
||||||
.expect("db name must have been set")
|
|
||||||
.clone();
|
|
||||||
let db_name = DatabaseName::new(&db_name_str).context(DatabaseNameError)?;
|
|
||||||
let db = server
|
|
||||||
.db_rules(&db_name)
|
|
||||||
.await
|
|
||||||
.context(DatabaseNotFound { name: &db_name_str })?;
|
|
||||||
|
|
||||||
let data = serde_json::to_string(&db).context(JsonGenerationError)?;
|
|
||||||
let response = Response::builder()
|
|
||||||
.header("Content-Type", "application/json")
|
|
||||||
.status(StatusCode::OK)
|
|
||||||
.body(Body::from(data))
|
|
||||||
.expect("builder should be successful");
|
|
||||||
|
|
||||||
Ok(response)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(level = "debug")]
|
#[tracing::instrument(level = "debug")]
|
||||||
async fn get_wal_meta<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
async fn get_wal_meta<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
|
@ -652,73 +582,6 @@ async fn get_wal_meta<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "debug")]
|
|
||||||
async fn set_writer<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
|
||||||
req: Request<Body>,
|
|
||||||
) -> Result<Response<Body>, ApplicationError> {
|
|
||||||
let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state"));
|
|
||||||
|
|
||||||
// Read the request body
|
|
||||||
let body = parse_body(req).await?;
|
|
||||||
|
|
||||||
// Parse the JSON body into a structure
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct WriterIdBody {
|
|
||||||
id: u32,
|
|
||||||
}
|
|
||||||
let req: WriterIdBody = serde_json::from_slice(body.as_ref()).context(InvalidRequestBody)?;
|
|
||||||
|
|
||||||
// Set the writer ID
|
|
||||||
server.set_id(req.id);
|
|
||||||
|
|
||||||
// Build a HTTP 200 response
|
|
||||||
let response = Response::builder()
|
|
||||||
.status(StatusCode::OK)
|
|
||||||
.body(Body::from(
|
|
||||||
serde_json::to_string(&req).expect("json encoding should not fail"),
|
|
||||||
))
|
|
||||||
.expect("builder should be successful");
|
|
||||||
|
|
||||||
Ok(response)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(level = "debug")]
|
|
||||||
async fn get_writer<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
|
||||||
req: Request<Body>,
|
|
||||||
) -> Result<Response<Body>, ApplicationError> {
|
|
||||||
let id = {
|
|
||||||
let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state"));
|
|
||||||
server.require_id()
|
|
||||||
};
|
|
||||||
|
|
||||||
// Parse the JSON body into a structure
|
|
||||||
#[derive(Serialize)]
|
|
||||||
struct WriterIdBody {
|
|
||||||
id: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
let body = WriterIdBody {
|
|
||||||
id: id.unwrap_or(0),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Build a HTTP 200 response
|
|
||||||
let response = Response::builder()
|
|
||||||
.status(StatusCode::OK)
|
|
||||||
.body(Body::from(
|
|
||||||
serde_json::to_string(&body).expect("json encoding should not fail"),
|
|
||||||
))
|
|
||||||
.expect("builder should be successful");
|
|
||||||
|
|
||||||
Ok(response)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Route to test that the server is alive
|
|
||||||
#[tracing::instrument(level = "debug")]
|
|
||||||
async fn ping(_: Request<Body>) -> Result<Response<Body>, ApplicationError> {
|
|
||||||
let response_body = "PONG";
|
|
||||||
Ok(Response::new(Body::from(response_body.to_string())))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(level = "debug")]
|
#[tracing::instrument(level = "debug")]
|
||||||
async fn health(_: Request<Body>) -> Result<Response<Body>, ApplicationError> {
|
async fn health(_: Request<Body>) -> Result<Response<Body>, ApplicationError> {
|
||||||
let response_body = "OK";
|
let response_body = "OK";
|
||||||
|
@ -849,22 +712,6 @@ mod tests {
|
||||||
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
|
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||||
type Result<T, E = Error> = std::result::Result<T, E>;
|
type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_ping() -> Result<()> {
|
|
||||||
let test_storage = Arc::new(AppServer::new(
|
|
||||||
ConnectionManagerImpl {},
|
|
||||||
Arc::new(ObjectStore::new_in_memory(InMemory::new())),
|
|
||||||
));
|
|
||||||
let server_url = test_server(Arc::clone(&test_storage));
|
|
||||||
|
|
||||||
let client = Client::new();
|
|
||||||
let response = client.get(&format!("{}/ping", server_url)).send().await;
|
|
||||||
|
|
||||||
// Print the response so if the test fails, we have a log of what went wrong
|
|
||||||
check_response("ping", response, StatusCode::OK, "PONG").await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_health() -> Result<()> {
|
async fn test_health() -> Result<()> {
|
||||||
let test_storage = Arc::new(AppServer::new(
|
let test_storage = Arc::new(AppServer::new(
|
||||||
|
@ -1123,133 +970,6 @@ mod tests {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn set_writer_id() {
|
|
||||||
let server = Arc::new(AppServer::new(
|
|
||||||
ConnectionManagerImpl {},
|
|
||||||
Arc::new(ObjectStore::new_in_memory(InMemory::new())),
|
|
||||||
));
|
|
||||||
server.set_id(1);
|
|
||||||
let server_url = test_server(Arc::clone(&server));
|
|
||||||
|
|
||||||
let data = r#"{"id":42}"#;
|
|
||||||
|
|
||||||
let client = Client::new();
|
|
||||||
|
|
||||||
let response = client
|
|
||||||
.put(&format!("{}/iox/api/v1/id", server_url))
|
|
||||||
.body(data)
|
|
||||||
.send()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
check_response("set_writer_id", response, StatusCode::OK, data).await;
|
|
||||||
|
|
||||||
assert_eq!(server.require_id().expect("should be set"), 42);
|
|
||||||
|
|
||||||
// Check get_writer_id
|
|
||||||
let response = client
|
|
||||||
.get(&format!("{}/iox/api/v1/id", server_url))
|
|
||||||
.send()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
check_response("get_writer_id", response, StatusCode::OK, data).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn list_databases() {
|
|
||||||
let server = Arc::new(AppServer::new(
|
|
||||||
ConnectionManagerImpl {},
|
|
||||||
Arc::new(ObjectStore::new_in_memory(InMemory::new())),
|
|
||||||
));
|
|
||||||
server.set_id(1);
|
|
||||||
let server_url = test_server(Arc::clone(&server));
|
|
||||||
|
|
||||||
let database_names: Vec<String> = vec!["foo_bar", "foo_baz"]
|
|
||||||
.iter()
|
|
||||||
.map(|i| i.to_string())
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
for database_name in &database_names {
|
|
||||||
let rules = DatabaseRules {
|
|
||||||
name: database_name.clone(),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
server.create_database(database_name, rules).await.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
let client = Client::new();
|
|
||||||
let response = client
|
|
||||||
.get(&format!("{}/iox/api/v1/databases", server_url))
|
|
||||||
.send()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let data = serde_json::to_string(&ListDatabasesResponse {
|
|
||||||
names: database_names,
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
check_response("list_databases", response, StatusCode::OK, &data).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn create_database() {
|
|
||||||
let server = Arc::new(AppServer::new(
|
|
||||||
ConnectionManagerImpl {},
|
|
||||||
Arc::new(ObjectStore::new_in_memory(InMemory::new())),
|
|
||||||
));
|
|
||||||
server.set_id(1);
|
|
||||||
let server_url = test_server(Arc::clone(&server));
|
|
||||||
|
|
||||||
let data = r#"{}"#;
|
|
||||||
|
|
||||||
let database_name = DatabaseName::new("foo_bar").unwrap();
|
|
||||||
|
|
||||||
let client = Client::new();
|
|
||||||
let response = client
|
|
||||||
.put(&format!(
|
|
||||||
"{}/iox/api/v1/databases/{}",
|
|
||||||
server_url, database_name
|
|
||||||
))
|
|
||||||
.body(data)
|
|
||||||
.send()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
check_response("create_database", response, StatusCode::OK, "").await;
|
|
||||||
|
|
||||||
server.db(&database_name).await.unwrap();
|
|
||||||
let db_rules = server.db_rules(&database_name).await.unwrap();
|
|
||||||
assert!(db_rules.mutable_buffer_config.is_some());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn get_database() {
|
|
||||||
let server = Arc::new(AppServer::new(
|
|
||||||
ConnectionManagerImpl {},
|
|
||||||
Arc::new(ObjectStore::new_in_memory(InMemory::new())),
|
|
||||||
));
|
|
||||||
server.set_id(1);
|
|
||||||
let server_url = test_server(Arc::clone(&server));
|
|
||||||
|
|
||||||
let database_name = "foo_bar";
|
|
||||||
let rules = DatabaseRules {
|
|
||||||
name: database_name.to_owned(),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
let data = serde_json::to_string(&rules).unwrap();
|
|
||||||
|
|
||||||
server.create_database(database_name, rules).await.unwrap();
|
|
||||||
|
|
||||||
let client = Client::new();
|
|
||||||
let response = client
|
|
||||||
.get(&format!(
|
|
||||||
"{}/iox/api/v1/databases/{}",
|
|
||||||
server_url, database_name
|
|
||||||
))
|
|
||||||
.send()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
check_response("get_database", response, StatusCode::OK, &data).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn get_wal_meta() {
|
async fn get_wal_meta() {
|
||||||
let server = Arc::new(AppServer::new(
|
let server = Arc::new(AppServer::new(
|
||||||
|
|
Loading…
Reference in New Issue