chore: remove generic term from HttpApi (#26210)
parent
c7854363c4
commit
f881c5844b
|
@ -73,7 +73,7 @@ pub struct WithPersister(Arc<Persister>);
|
|||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct NoTimeProvider;
|
||||
#[derive(Debug)]
|
||||
pub struct WithTimeProvider<T>(Arc<T>);
|
||||
pub struct WithTimeProvider(Arc<dyn TimeProvider>);
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct NoListener;
|
||||
#[derive(Debug)]
|
||||
|
@ -139,7 +139,10 @@ impl<W, Q, T, L, E> ServerBuilder<W, Q, NoPersister, T, L, E> {
|
|||
}
|
||||
|
||||
impl<W, Q, P, L, E> ServerBuilder<W, Q, P, NoTimeProvider, L, E> {
|
||||
pub fn time_provider<T>(self, tp: Arc<T>) -> ServerBuilder<W, Q, P, WithTimeProvider<T>, L, E> {
|
||||
pub fn time_provider(
|
||||
self,
|
||||
tp: Arc<dyn TimeProvider>,
|
||||
) -> ServerBuilder<W, Q, P, WithTimeProvider, L, E> {
|
||||
ServerBuilder {
|
||||
common_state: self.common_state,
|
||||
time_provider: WithTimeProvider(tp),
|
||||
|
@ -189,17 +192,17 @@ impl<W, Q, P, T, L> ServerBuilder<W, Q, P, T, L, NoProcessingEngine> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T: TimeProvider>
|
||||
impl
|
||||
ServerBuilder<
|
||||
WithWriteBuf,
|
||||
WithQueryExec,
|
||||
WithPersister,
|
||||
WithTimeProvider<T>,
|
||||
WithTimeProvider,
|
||||
WithListener,
|
||||
WithProcessingEngine,
|
||||
>
|
||||
{
|
||||
pub async fn build(self) -> Server<T> {
|
||||
pub async fn build(self) -> Server {
|
||||
let persister = Arc::clone(&self.persister.0);
|
||||
let authorizer = Arc::clone(&self.authorizer);
|
||||
let processing_engine = Arc::clone(&self.processing_engine.0);
|
||||
|
|
|
@ -489,21 +489,21 @@ impl IntoResponse for Error {
|
|||
pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct HttpApi<T> {
|
||||
pub(crate) struct HttpApi {
|
||||
common_state: CommonServerState,
|
||||
write_buffer: Arc<dyn WriteBuffer>,
|
||||
processing_engine: Arc<ProcessingEngineManagerImpl>,
|
||||
time_provider: Arc<T>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
pub(crate) query_executor: Arc<dyn QueryExecutor>,
|
||||
max_request_bytes: usize,
|
||||
authorizer: Arc<dyn Authorizer>,
|
||||
legacy_write_param_unifier: SingleTenantRequestUnifier,
|
||||
}
|
||||
|
||||
impl<T> HttpApi<T> {
|
||||
impl HttpApi {
|
||||
pub(crate) fn new(
|
||||
common_state: CommonServerState,
|
||||
time_provider: Arc<T>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
write_buffer: Arc<dyn WriteBuffer>,
|
||||
query_executor: Arc<dyn QueryExecutor>,
|
||||
processing_engine: Arc<ProcessingEngineManagerImpl>,
|
||||
|
@ -524,10 +524,7 @@ impl<T> HttpApi<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> HttpApi<T>
|
||||
where
|
||||
T: TimeProvider,
|
||||
{
|
||||
impl HttpApi {
|
||||
async fn write_lp(&self, req: Request<Body>) -> Result<Response<Body>> {
|
||||
let query = req.uri().query().ok_or(Error::MissingWriteParams)?;
|
||||
let params: WriteParams = serde_urlencoded::from_str(query)?;
|
||||
|
@ -1606,8 +1603,8 @@ async fn record_batch_stream_to_body(
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn route_request<T: TimeProvider>(
|
||||
http_server: Arc<HttpApi<T>>,
|
||||
pub(crate) async fn route_request(
|
||||
http_server: Arc<HttpApi>,
|
||||
mut req: Request<Body>,
|
||||
) -> Result<Response<Body>, Infallible> {
|
||||
if let Err(e) = http_server.authorize_request(&mut req).await {
|
||||
|
|
|
@ -26,7 +26,6 @@ use futures::{Stream, StreamExt, ready, stream::Fuse};
|
|||
use hyper::http::HeaderValue;
|
||||
use hyper::{Body, Request, Response, StatusCode, header::ACCEPT, header::CONTENT_TYPE};
|
||||
use influxdb_influxql_parser::select::{Dimension, GroupByClause};
|
||||
use iox_time::TimeProvider;
|
||||
use observability_deps::tracing::info;
|
||||
use regex::Regex;
|
||||
use schema::{INFLUXQL_MEASUREMENT_COLUMN_NAME, InfluxColumnType, TIME_COLUMN_NAME};
|
||||
|
@ -37,10 +36,7 @@ use super::{Error, HttpApi, Result};
|
|||
|
||||
const DEFAULT_CHUNK_SIZE: usize = 10_000;
|
||||
|
||||
impl<T> HttpApi<T>
|
||||
where
|
||||
T: TimeProvider,
|
||||
{
|
||||
impl HttpApi {
|
||||
/// Implements the v1 query API for InfluxDB
|
||||
///
|
||||
/// Accepts the URL parameters, defined by [`QueryParams`]), and returns a stream
|
||||
|
|
|
@ -29,7 +29,6 @@ use hyper::server::conn::Http;
|
|||
use hyper::service::service_fn;
|
||||
use influxdb3_telemetry::store::TelemetryStore;
|
||||
use influxdb3_write::persister::Persister;
|
||||
use iox_time::TimeProvider;
|
||||
use observability_deps::tracing::error;
|
||||
use observability_deps::tracing::info;
|
||||
use service::hybrid;
|
||||
|
@ -116,28 +115,25 @@ impl CommonServerState {
|
|||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug)]
|
||||
pub struct Server<T> {
|
||||
pub struct Server {
|
||||
common_state: CommonServerState,
|
||||
http: Arc<HttpApi<T>>,
|
||||
http: Arc<HttpApi>,
|
||||
persister: Arc<Persister>,
|
||||
authorizer: Arc<dyn Authorizer>,
|
||||
listener: TcpListener,
|
||||
}
|
||||
|
||||
impl<T> Server<T> {
|
||||
impl Server {
|
||||
pub fn authorizer(&self) -> Arc<dyn Authorizer> {
|
||||
Arc::clone(&self.authorizer)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn serve<T>(
|
||||
server: Server<T>,
|
||||
pub async fn serve(
|
||||
server: Server,
|
||||
shutdown: CancellationToken,
|
||||
startup_timer: Instant,
|
||||
) -> Result<()>
|
||||
where
|
||||
T: TimeProvider,
|
||||
{
|
||||
) -> Result<()> {
|
||||
let req_metrics = RequestMetrics::new(
|
||||
Arc::clone(&server.common_state.metrics),
|
||||
MetricFamily::HttpServer,
|
||||
|
@ -840,7 +836,7 @@ mod tests {
|
|||
.query_executor(query_executor)
|
||||
.persister(persister)
|
||||
.authorizer(Arc::new(DefaultAuthorizer))
|
||||
.time_provider(Arc::clone(&time_provider))
|
||||
.time_provider(Arc::clone(&time_provider) as _)
|
||||
.tcp_listener(listener)
|
||||
.processing_engine(processing_engine)
|
||||
.build()
|
||||
|
|
Loading…
Reference in New Issue