Merge pull request #866 from influxdata/readapi

feat: Implement /iox/api/v1/databases/:name/query?q=
pull/24376/head
kodiakhq[bot] 2021-02-24 11:24:15 +00:00 committed by GitHub
commit b7bb601b89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 99 additions and 0 deletions

View File

@ -263,6 +263,7 @@ where
.get("/iox/api/v1/databases", list_databases::<M>)
.put("/iox/api/v1/databases/:name", create_database::<M>)
.get("/iox/api/v1/databases/:name", get_database::<M>)
.get("/iox/api/v1/databases/:name/query", query::<M>)
.get("/iox/api/v1/databases/:name/wal/meta", get_wal_meta::<M>)
.put("/iox/api/v1/id", set_writer::<M>)
.get("/iox/api/v1/id", get_writer::<M>)
@ -455,6 +456,54 @@ async fn read<M: ConnectionManager + Send + Sync + Debug + 'static>(
Ok(Response::new(Body::from(results.into_bytes())))
}
#[derive(Deserialize, Debug)]
/// Body of the request to the .../query endpoint
struct QueryInfo {
q: String,
}
// TODO: figure out how to stream read results out rather than rendering the
// whole thing in mem
#[tracing::instrument(level = "debug")]
async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> {
let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state"));
let query = req.uri().query().context(ExpectedQueryString {})?;
let query_info: QueryInfo = serde_urlencoded::from_str(query).context(InvalidQueryString {
query_string: query,
})?;
let db_name_str = req
.param("name")
.expect("db name must have been set")
.clone();
let db_name = DatabaseName::new(&db_name_str).context(DatabaseNameError)?;
let db = server
.db(&db_name)
.await
.context(DatabaseNotFound { name: &db_name_str })?;
let planner = SQLQueryPlanner::default();
let executor = server.executor();
let physical_plan = planner
.query(db.as_ref(), &query_info.q, executor.as_ref())
.await
.context(PlanningSQLQuery { query })?;
let batches = collect(physical_plan)
.await
.map_err(|e| Box::new(e) as _)
.context(Query { db_name })?;
let results = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
Ok(Response::new(Body::from(results.into_bytes())))
}
#[tracing::instrument(level = "debug")]
async fn list_databases<M>(req: Request<Body>) -> Result<Response<Body>, ApplicationError>
where
@ -833,6 +882,56 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_query() -> Result<()> {
let test_storage = Arc::new(AppServer::new(
ConnectionManagerImpl {},
Arc::new(ObjectStore::new_in_memory(InMemory::new())),
));
test_storage.set_id(1);
test_storage
.create_database("MyOrg_MyBucket", DatabaseRules::new())
.await
.unwrap();
let server_url = test_server(Arc::clone(&test_storage));
let client = Client::new();
let lp_data = "h2o_temperature,location=santa_monica,state=CA surface_degrees=65.2,bottom_degrees=50.4 1568756160";
// send write data
let bucket_name = "MyBucket";
let org_name = "MyOrg";
let response = client
.post(&format!(
"{}/api/v2/write?bucket={}&org={}",
server_url, bucket_name, org_name
))
.body(lp_data)
.send()
.await;
check_response("write", response, StatusCode::NO_CONTENT, "").await;
// send query data
let response = client
.get(&format!(
"{}/iox/api/v1/databases/MyOrg_MyBucket/query?q={}",
server_url, "select%20*%20from%20h2o_temperature"
))
.send()
.await;
let res = "+----------------+--------------+-------+-----------------+------------+\n\
| bottom_degrees | location | state | surface_degrees | time |\n\
+----------------+--------------+-------+-----------------+------------+\n\
| 50.4 | santa_monica | CA | 65.2 | 1568756160 |\n\
+----------------+--------------+-------+-----------------+------------+\n";
check_response("query", response, StatusCode::OK, res).await;
Ok(())
}
fn gzip_str(s: &str) -> Vec<u8> {
use flate2::{write::GzEncoder, Compression};
use std::io::Write;