diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 7ebe08bf90..88ba5cfe47 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -14,6 +14,7 @@ pub const TIME_COLUMN_NAME: &str = "time"; pub mod data; pub mod database_rules; pub mod error; +pub mod names; pub mod partition_metadata; pub mod table_schema; diff --git a/src/server.rs b/data_types/src/names.rs similarity index 92% rename from src/server.rs rename to data_types/src/names.rs index 6f88cdac18..133719dbdc 100644 --- a/src/server.rs +++ b/data_types/src/names.rs @@ -1,7 +1,4 @@ -pub mod http_routes; -pub mod rpc; - -use data_types::{DatabaseName, DatabaseNameError}; +use crate::{DatabaseName, DatabaseNameError}; use snafu::{ResultExt, Snafu}; #[derive(Debug, Snafu)] @@ -22,7 +19,7 @@ pub enum OrgBucketMappingError { /// This function ensures the mapping is unambiguous by requiring both `org` and /// `bucket` to not contain the `_` character in addition to the /// [`DatabaseName`] validation. -pub(crate) fn org_and_bucket_to_database<'a, O: AsRef, B: AsRef>( +pub fn org_and_bucket_to_database<'a, O: AsRef, B: AsRef>( org: O, bucket: B, ) -> Result, OrgBucketMappingError> { diff --git a/src/commands/influxdb_ioxd.rs b/src/influxdb_ioxd.rs similarity index 95% rename from src/commands/influxdb_ioxd.rs rename to src/influxdb_ioxd.rs index 905149eb0f..638a70a308 100644 --- a/src/commands/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -5,8 +5,9 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; -use crate::server::http_routes; -use crate::server::rpc::service; +pub mod http_routes; +pub mod rpc; + use server::{ConnectionManagerImpl as ConnectionManager, Server as AppServer}; use hyper::Server; @@ -16,7 +17,7 @@ use snafu::{ResultExt, Snafu}; use panic_logging::SendPanicsToTracing; -use super::{ +use crate::commands::{ config::{load_config, Config}, logging::LoggingLevel, }; @@ -65,9 +66,7 @@ pub enum Error { ServingHttp { source: hyper::error::Error }, #[snafu(display("Error serving RPC: {}", source))] - ServingRPC { - source: crate::server::rpc::service::Error, - }, + ServingRPC { source: self::rpc::service::Error }, } pub type Result = std::result::Result; @@ -127,7 +126,7 @@ pub async fn main(logging_level: LoggingLevel, config: Option) -> Result .await .context(StartListeningGrpc { grpc_bind_addr })?; - let grpc_server = service::make_server(socket, app_server.clone()); + let grpc_server = self::rpc::service::make_server(socket, app_server.clone()); info!(bind_address=?grpc_bind_addr, "gRPC server listening"); diff --git a/src/server/http_routes.rs b/src/influxdb_ioxd/http_routes.rs similarity index 99% rename from src/server/http_routes.rs rename to src/influxdb_ioxd/http_routes.rs index f52cbe326b..36efce4890 100644 --- a/src/server/http_routes.rs +++ b/src/influxdb_ioxd/http_routes.rs @@ -9,11 +9,13 @@ //! Long term, we expect to create IOx specific api in terms of //! database names and may remove this quasi /v2 API. -use super::{org_and_bucket_to_database, OrgBucketMappingError}; - // Influx crates use arrow_deps::{arrow, datafusion::physical_plan::collect}; -use data_types::{database_rules::DatabaseRules, DatabaseName}; +use data_types::{ + database_rules::DatabaseRules, + names::{org_and_bucket_to_database, OrgBucketMappingError}, + DatabaseName, +}; use influxdb_line_protocol::parse_lines; use object_store::path::ObjectStorePath; use query::{frontend::sql::SQLQueryPlanner, Database, DatabaseStore}; diff --git a/src/server/rpc.rs b/src/influxdb_ioxd/rpc.rs similarity index 100% rename from src/server/rpc.rs rename to src/influxdb_ioxd/rpc.rs diff --git a/src/server/rpc/data.rs b/src/influxdb_ioxd/rpc/data.rs similarity index 100% rename from src/server/rpc/data.rs rename to src/influxdb_ioxd/rpc/data.rs diff --git a/src/server/rpc/expr.rs b/src/influxdb_ioxd/rpc/expr.rs similarity index 100% rename from src/server/rpc/expr.rs rename to src/influxdb_ioxd/rpc/expr.rs diff --git a/src/server/rpc/id.rs b/src/influxdb_ioxd/rpc/id.rs similarity index 100% rename from src/server/rpc/id.rs rename to src/influxdb_ioxd/rpc/id.rs diff --git a/src/server/rpc/input.rs b/src/influxdb_ioxd/rpc/input.rs similarity index 100% rename from src/server/rpc/input.rs rename to src/influxdb_ioxd/rpc/input.rs diff --git a/src/server/rpc/service.rs b/src/influxdb_ioxd/rpc/service.rs similarity index 98% rename from src/server/rpc/service.rs rename to src/influxdb_ioxd/rpc/service.rs index 4c2dae8166..bd164d59b4 100644 --- a/src/server/rpc/service.rs +++ b/src/influxdb_ioxd/rpc/service.rs @@ -16,16 +16,13 @@ use generated_types::{ use data_types::error::ErrorLogger; -#[allow(unused_imports)] -// For some reason rust thinks these imports are unused, but then -// complains of unresolved imports if they are not imported. -use generated_types::{node, Node}; use query::exec::fieldlist::FieldList; use query::group_by::GroupByAndAggregate; -use crate::server::org_and_bucket_to_database; -use crate::server::rpc::expr::{self, AddRPCNode, Loggable, SpecialTagKeys}; -use crate::server::rpc::input::GrpcInputs; +use super::expr::{self, AddRPCNode, Loggable, SpecialTagKeys}; +use super::input::GrpcInputs; +use data_types::names::org_and_bucket_to_database; + use data_types::DatabaseName; use query::{ @@ -110,13 +107,13 @@ pub enum Error { #[snafu(display("Error converting Predicate '{}: {}", rpc_predicate_string, source))] ConvertingPredicate { rpc_predicate_string: String, - source: crate::server::rpc::expr::Error, + source: super::expr::Error, }, #[snafu(display("Error converting group type '{}': {}", aggregate_string, source))] ConvertingReadGroupType { aggregate_string: String, - source: crate::server::rpc::expr::Error, + source: super::expr::Error, }, #[snafu(display( @@ -126,7 +123,7 @@ pub enum Error { ))] ConvertingReadGroupAggregate { aggregate_string: String, - source: crate::server::rpc::expr::Error, + source: super::expr::Error, }, #[snafu(display( @@ -136,7 +133,7 @@ pub enum Error { ))] ConvertingWindowAggregate { aggregate_string: String, - source: crate::server::rpc::expr::Error, + source: super::expr::Error, }, #[snafu(display("Error computing series: {}", source))] @@ -149,14 +146,10 @@ pub enum Error { ComputingGroupedSeriesSet { source: SeriesSetError }, #[snafu(display("Error converting time series into gRPC response: {}", source))] - ConvertingSeriesSet { - source: crate::server::rpc::data::Error, - }, + ConvertingSeriesSet { source: super::data::Error }, #[snafu(display("Converting field information series into gRPC response: {}", source))] - ConvertingFieldList { - source: crate::server::rpc::data::Error, - }, + ConvertingFieldList { source: super::data::Error }, #[snafu(display("Error sending results via channel: {}", source))] SendingResults { @@ -1159,7 +1152,7 @@ where #[cfg(test)] mod tests { - use crate::server::rpc::id::ID; + use super::super::id::ID; use super::*; use arrow_deps::arrow::datatypes::DataType; @@ -1187,8 +1180,8 @@ mod tests { use futures::prelude::*; use generated_types::{ - aggregate::AggregateType, i_ox_testing_client, read_response::frame, storage_client, - Aggregate as RPCAggregate, Duration as RPCDuration, ReadSource, Window as RPCWindow, + aggregate::AggregateType, i_ox_testing_client, node, read_response::frame, storage_client, + Aggregate as RPCAggregate, Duration as RPCDuration, Node, ReadSource, Window as RPCWindow, }; use prost::Message; @@ -1729,7 +1722,7 @@ mod tests { // Note we don't include the actual line / column in the // expected panic message to avoid needing to update the test // whenever the source code file changed. - let expected_error = "panicked at 'This is a test panic', src/server/rpc/service.rs"; + let expected_error = "panicked at 'This is a test panic', src/influxdb_ioxd/rpc/service.rs"; assert!( captured_logs.contains(expected_error), "Logs did not contain expected panic message '{}'. They were\n{}", diff --git a/src/main.rs b/src/main.rs index 9113518748..62e40e87aa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,17 +13,15 @@ use structopt::StructOpt; use tokio::runtime::Runtime; use tracing::{debug, error, info, warn}; -pub mod server; - mod commands { pub mod config; pub mod convert; pub mod file_meta; - pub mod influxdb_ioxd; mod input; pub mod logging; pub mod stats; } +pub mod influxdb_ioxd; use commands::{config::Config, logging::LoggingLevel}; @@ -197,8 +195,7 @@ async fn dispatch_args(matches: ArgMatches<'_>) { // Note don't set up basic logging here, different logging rules appy in server // mode let res = - commands::influxdb_ioxd::main(logging_level, Some(Config::from_clap(sub_matches))) - .await; + influxdb_ioxd::main(logging_level, Some(Config::from_clap(sub_matches))).await; if let Err(e) = res { error!("Server shutdown with error: {}", e); @@ -209,7 +206,7 @@ async fn dispatch_args(matches: ArgMatches<'_>) { (_, _) => { // Note don't set up basic logging here, different logging rules appy in server // mode - let res = commands::influxdb_ioxd::main(logging_level, None).await; + let res = influxdb_ioxd::main(logging_level, None).await; if let Err(e) = res { error!("Server shutdown with error: {}", e); std::process::exit(ReturnCode::ServerExitedAbnormally as _);