chore: merge main to branch

pull/24376/head
NGA-TRAN 2022-01-24 12:06:23 -05:00
commit 797ba459b9
49 changed files with 1574 additions and 475 deletions

43
Cargo.lock generated
View File

@ -1726,7 +1726,9 @@ dependencies = [
"influxdb_iox_client",
"influxdb_line_protocol",
"influxdb_storage_client",
"ingester",
"internal_types",
"iox_catalog",
"iox_object_store",
"itertools",
"job_registry",
@ -1861,13 +1863,16 @@ dependencies = [
"arrow_util",
"data_types",
"datafusion 0.1.0",
"hyper",
"iox_catalog",
"metric",
"mutable_batch",
"parking_lot",
"predicate",
"query",
"schema",
"snafu",
"thiserror",
"tokio",
"uuid",
"workspace-hack",
@ -3216,6 +3221,7 @@ dependencies = [
"observability_deps",
"ordered-float 2.10.0",
"regex",
"regex-syntax",
"schema",
"serde_json",
"snafu",
@ -3760,6 +3766,27 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "rskafka"
version = "0.1.0"
source = "git+https://github.com/influxdata/rskafka.git?rev=63cf0a541b864d5376b2f96537a5bbb610d0e4bc#63cf0a541b864d5376b2f96537a5bbb610d0e4bc"
dependencies = [
"async-trait",
"bytes",
"crc",
"futures",
"parking_lot",
"pin-project-lite",
"rand",
"rustls",
"thiserror",
"time 0.3.5",
"tokio",
"tokio-rustls",
"tracing",
"varint-rs",
]
[[package]]
name = "rusoto_core"
version = "0.47.0"
@ -4653,6 +4680,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "time"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41effe7cfa8af36f439fac33861b66b049edc6f9a32331e2312660529c1c24ad"
dependencies = [
"libc",
]
[[package]]
name = "tinytemplate"
version = "1.2.1"
@ -5132,6 +5168,12 @@ dependencies = [
"getrandom",
]
[[package]]
name = "varint-rs"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23"
[[package]]
name = "vcpkg"
version = "0.2.15"
@ -5454,6 +5496,7 @@ dependencies = [
"pin-project",
"prost",
"rdkafka",
"rskafka",
"tempfile",
"time 0.1.0",
"tokio",

View File

@ -169,7 +169,7 @@ mod tests {
#[test]
fn test_expr_to_sql_no_expressions() {
let pred = DeletePredicate {
range: TimestampRange { start: 1, end: 2 },
range: TimestampRange::new(1, 2),
exprs: vec![],
};
assert_eq!(&pred.expr_sql_string(), "");
@ -178,7 +178,7 @@ mod tests {
#[test]
fn test_expr_to_sql_operators() {
let pred = DeletePredicate {
range: TimestampRange { start: 1, end: 2 },
range: TimestampRange::new(1, 2),
exprs: vec![
DeleteExpr {
column: String::from("col1"),
@ -198,7 +198,7 @@ mod tests {
#[test]
fn test_expr_to_sql_column_escape() {
let pred = DeletePredicate {
range: TimestampRange { start: 1, end: 2 },
range: TimestampRange::new(1, 2),
exprs: vec![
DeleteExpr {
column: String::from("col 1"),
@ -226,7 +226,7 @@ mod tests {
#[test]
fn test_expr_to_sql_bool() {
let pred = DeletePredicate {
range: TimestampRange { start: 1, end: 2 },
range: TimestampRange::new(1, 2),
exprs: vec![
DeleteExpr {
column: String::from("col1"),
@ -246,7 +246,7 @@ mod tests {
#[test]
fn test_expr_to_sql_i64() {
let pred = DeletePredicate {
range: TimestampRange { start: 1, end: 2 },
range: TimestampRange::new(1, 2),
exprs: vec![
DeleteExpr {
column: String::from("col1"),
@ -284,7 +284,7 @@ mod tests {
#[test]
fn test_expr_to_sql_f64() {
let pred = DeletePredicate {
range: TimestampRange { start: 1, end: 2 },
range: TimestampRange::new(1, 2),
exprs: vec![
DeleteExpr {
column: String::from("col1"),
@ -327,7 +327,7 @@ mod tests {
#[test]
fn test_expr_to_sql_string() {
let pred = DeletePredicate {
range: TimestampRange { start: 1, end: 2 },
range: TimestampRange::new(1, 2),
exprs: vec![
DeleteExpr {
column: String::from("col1"),

View File

@ -36,15 +36,17 @@ pub const MAX_NANO_TIME: i64 = i64::MAX - 1;
/// ```
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Copy, Debug, Hash)]
pub struct TimestampRange {
/// Start defines the inclusive lower bound.
pub start: i64,
/// End defines the exclusive upper bound.
pub end: i64,
/// Start defines the inclusive lower bound. Minimum value is [MIN_NANO_TIME]
start: i64,
/// End defines the exclusive upper bound. Maximum value is [MAX_NANO_TIME]
end: i64,
}
impl TimestampRange {
pub fn new(start: i64, end: i64) -> Self {
debug_assert!(end >= start);
let start = start.max(MIN_NANO_TIME);
let end = end.min(MAX_NANO_TIME);
Self { start, end }
}
@ -59,6 +61,16 @@ impl TimestampRange {
pub fn contains_opt(&self, v: Option<i64>) -> bool {
Some(true) == v.map(|ts| self.contains(ts))
}
/// Return the timestamp range's end.
pub fn end(&self) -> i64 {
self.end
}
/// Return the timestamp range's start.
pub fn start(&self) -> i64 {
self.start
}
}
/// Specifies a min/max timestamp value.
@ -93,6 +105,40 @@ impl TimestampMinMax {
mod tests {
use super::*;
#[test]
fn test_timestamp_nano_min_max() {
let cases = vec![
(
"MIN/MAX Nanos",
TimestampRange::new(MIN_NANO_TIME, MAX_NANO_TIME),
),
("MIN/MAX i64", TimestampRange::new(i64::MIN, i64::MAX)),
];
for (name, range) in cases {
println!("case: {}", name);
assert!(!range.contains(i64::MIN));
assert!(range.contains(MIN_NANO_TIME));
assert!(range.contains(MIN_NANO_TIME + 1));
assert!(range.contains(MAX_NANO_TIME - 1));
assert!(!range.contains(MAX_NANO_TIME));
assert!(!range.contains(i64::MAX));
}
}
#[test]
fn test_timestamp_i64_min_max_offset() {
let range = TimestampRange::new(MIN_NANO_TIME + 1, MAX_NANO_TIME - 1);
assert!(!range.contains(i64::MIN));
assert!(!range.contains(MIN_NANO_TIME));
assert!(range.contains(MIN_NANO_TIME + 1));
assert!(range.contains(MAX_NANO_TIME - 2));
assert!(!range.contains(MAX_NANO_TIME - 1));
assert!(!range.contains(MAX_NANO_TIME));
assert!(!range.contains(i64::MAX));
}
#[test]
fn test_timestamp_range_contains() {
let range = TimestampRange::new(100, 200);

View File

@ -1170,7 +1170,7 @@ mod tests {
// Build delete predicate and expected output
let del_pred1 = Arc::new(DeletePredicate {
range: TimestampRange { start: 0, end: 100 },
range: TimestampRange::new(0, 100),
exprs: vec![DeleteExpr::new(
"city".to_string(),
data_types::delete_predicate::Op::Eq,
@ -1192,7 +1192,7 @@ mod tests {
// let add more delete predicate = simulate second delete
// Build delete predicate and expected output
let del_pred2 = Arc::new(DeletePredicate {
range: TimestampRange { start: 20, end: 50 },
range: TimestampRange::new(20, 50),
exprs: vec![DeleteExpr::new(
"cost".to_string(),
data_types::delete_predicate::Op::Ne,

View File

@ -255,10 +255,7 @@ mod tests {
// Cannot simply use empty predicate (#2687)
let predicate = Arc::new(DeletePredicate {
range: TimestampRange {
start: 0,
end: 1_000,
},
range: TimestampRange::new(0, 1_000),
exprs: vec![],
});
@ -288,10 +285,7 @@ mod tests {
write_lp(db.as_ref(), "cpu foo=3 20");
write_lp(db.as_ref(), "cpu foo=4 20");
let range = TimestampRange {
start: 0,
end: 1_000,
};
let range = TimestampRange::new(0, 1_000);
let pred1 = Arc::new(DeletePredicate {
range,
exprs: vec![DeleteExpr::new("foo".to_string(), Op::Eq, Scalar::I64(1))],

View File

@ -811,7 +811,7 @@ mod tests {
// Create 3 delete predicates that will delete all cookies in 3 different deletes
let pred1 = Arc::new(DeletePredicate {
range: TimestampRange { start: 0, end: 11 },
range: TimestampRange::new(0, 11),
exprs: vec![DeleteExpr::new(
"tag1".to_string(),
data_types::delete_predicate::Op::Eq,
@ -819,7 +819,7 @@ mod tests {
)],
});
let pred2 = Arc::new(DeletePredicate {
range: TimestampRange { start: 12, end: 21 },
range: TimestampRange::new(12, 21),
exprs: vec![DeleteExpr::new(
"tag1".to_string(),
data_types::delete_predicate::Op::Eq,
@ -827,7 +827,7 @@ mod tests {
)],
});
let pred3 = Arc::new(DeletePredicate {
range: TimestampRange { start: 22, end: 31 },
range: TimestampRange::new(22, 31),
exprs: vec![DeleteExpr::new(
"tag1".to_string(),
data_types::delete_predicate::Op::Eq,
@ -976,7 +976,7 @@ mod tests {
// Delete all
let predicate = Arc::new(DeletePredicate {
range: TimestampRange { start: 0, end: 30 },
range: TimestampRange::new(0, 30),
exprs: vec![],
});
db.delete("cpu", predicate).unwrap();

View File

@ -324,7 +324,7 @@ mod tests {
// Delete first row
let predicate = Arc::new(DeletePredicate {
range: TimestampRange { start: 0, end: 20 },
range: TimestampRange::new(0, 20),
exprs: vec![],
});
@ -388,10 +388,7 @@ mod tests {
// Delete everything
let predicate = Arc::new(DeletePredicate {
range: TimestampRange {
start: 0,
end: 1_000,
},
range: TimestampRange::new(0, 1_000),
exprs: vec![],
});
@ -430,10 +427,7 @@ mod tests {
// Cannot simply use empty predicate (#2687)
let predicate = Arc::new(DeletePredicate {
range: TimestampRange {
start: 0,
end: 1_000,
},
range: TimestampRange::new(0, 1_000),
exprs: vec![],
});
@ -494,10 +488,7 @@ mod tests {
write_lp(db.as_ref(), "cpu foo=3 20");
write_lp(db.as_ref(), "cpu foo=4 20");
let range = TimestampRange {
start: 0,
end: 1_000,
};
let range = TimestampRange::new(0, 1_000);
let pred1 = Arc::new(DeletePredicate {
range,
exprs: vec![DeleteExpr::new("foo".to_string(), Op::Eq, Scalar::I64(1))],

View File

@ -37,7 +37,7 @@ pub fn to_read_buffer_predicate(predicate: &Predicate) -> Result<read_buffer::Pr
// InfluxDB-specific expressions on the time column.
Ok(match predicate.range {
Some(range) => {
read_buffer::Predicate::with_time_range(&exprs, range.start, range.end)
read_buffer::Predicate::with_time_range(&exprs, range.start(), range.end())
}
None => read_buffer::Predicate::new(exprs),
})

View File

@ -2653,7 +2653,7 @@ mod tests {
sequence_number: 1,
table_name: None,
predicate: DeletePredicate {
range: TimestampRange { start: 0, end: 20 },
range: TimestampRange::new(0, 20),
exprs: vec![],
},
}]),
@ -2707,7 +2707,7 @@ mod tests {
sequence_number: 1,
table_name: None,
predicate: DeletePredicate {
range: TimestampRange { start: 0, end: 11 },
range: TimestampRange::new(0, 11),
exprs: vec![],
},
}]),
@ -2817,7 +2817,7 @@ mod tests {
sequence_number: 2,
table_name: Some("table_1"),
predicate: DeletePredicate {
range: TimestampRange { start: 19, end: 21 },
range: TimestampRange::new(19, 21),
exprs: vec![],
},
}]),

View File

@ -593,7 +593,7 @@ mod tests {
let meta = DmlMeta::unsequenced(None);
let delete = DmlDelete::new(
DeletePredicate {
range: TimestampRange { start: 1, end: 2 },
range: TimestampRange::new(1, 2),
exprs: vec![],
},
None,
@ -615,7 +615,7 @@ mod tests {
let meta = DmlMeta::unsequenced(None);
let delete = DmlDelete::new(
DeletePredicate {
range: TimestampRange { start: 3, end: 4 },
range: TimestampRange::new(3, 4),
exprs: vec![],
},
Some(NonEmptyString::new("some_foo").unwrap()),
@ -630,7 +630,7 @@ mod tests {
let meta = DmlMeta::unsequenced(None);
let delete = DmlDelete::new(
DeletePredicate {
range: TimestampRange { start: 5, end: 6 },
range: TimestampRange::new(5, 6),
exprs: vec![],
},
Some(NonEmptyString::new("bar").unwrap()),

View File

@ -20,8 +20,8 @@ impl From<DeletePredicate> for proto::Predicate {
fn from(predicate: DeletePredicate) -> Self {
proto::Predicate {
range: Some(proto::TimestampRange {
start: predicate.range.start,
end: predicate.range.end,
start: predicate.range.start(),
end: predicate.range.end(),
}),
exprs: predicate.exprs.into_iter().map(Into::into).collect(),
}
@ -35,10 +35,7 @@ impl TryFrom<proto::Predicate> for DeletePredicate {
let range = value.range.unwrap_field("range")?;
Ok(Self {
range: TimestampRange {
start: range.start,
end: range.end,
},
range: TimestampRange::new(range.start, range.end),
exprs: value.exprs.repeated("exprs")?,
})
}

View File

@ -14,7 +14,9 @@ dml = { path = "../dml" }
generated_types = { path = "../generated_types" }
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
ingester = { path = "../ingester" }
internal_types = { path = "../internal_types" }
iox_catalog = { path = "../iox_catalog" }
iox_object_store = { path = "../iox_object_store" }
job_registry = { path = "../job_registry" }
logfmt = { path = "../logfmt" }

View File

@ -0,0 +1,121 @@
//! Implementation of command line option for running ingester
use std::sync::Arc;
use crate::{
clap_blocks::run_config::RunConfig,
influxdb_ioxd::{
self,
server_type::{
common_state::{CommonServerState, CommonServerStateError},
ingester::IngesterServerType,
},
},
};
use ingester::handler::IngestHandlerImpl;
use ingester::server::grpc::GrpcDelegate;
use ingester::server::{http::HttpDelegate, IngesterServer};
use iox_catalog::interface::{Catalog, KafkaPartition};
use iox_catalog::postgres::PostgresCatalog;
use observability_deps::tracing::*;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("Run: {0}")]
Run(#[from] influxdb_ioxd::Error),
#[error("Cannot setup server: {0}")]
Setup(#[from] crate::influxdb_ioxd::server_type::database::setup::Error),
#[error("Invalid config: {0}")]
InvalidConfig(#[from] CommonServerStateError),
#[error("Catalog error: {0}")]
Catalog(#[from] iox_catalog::interface::Error),
#[error("Kafka topic {0} not found in the catalog")]
KafkaTopicNotFound(String),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, clap::Parser)]
#[clap(
name = "run",
about = "Runs in ingester mode",
long_about = "Run the IOx ingester server.\n\nThe configuration options below can be \
set either with the command line flags or with the specified environment \
variable. If there is a file named '.env' in the current working directory, \
it is sourced before loading the configuration.
Configuration is loaded from the following sources (highest precedence first):
- command line arguments
- user set environment variables
- .env file contents
- pre-configured default values"
)]
pub struct Config {
#[clap(flatten)]
pub(crate) run_config: RunConfig,
/// Postgres connection string
#[clap(env = "INFLUXDB_IOX_CATALOG_DNS")]
pub catalog_dsn: String,
/// Kafka connection string
#[clap(env = "INFLUXDB_IOX_KAFKA_CONNECTION")]
pub kafka_connection: String,
/// Kafka topic name
#[clap(env = "INFLUXDB_IOX_KAFKA_TOPIC")]
pub kafka_topic: String,
/// Kafka partition number to start (inclusive) range with
#[clap(env = "INFLUXDB_IOX_KAFKA_PARTITION_RANGE_START")]
pub kafka_partition_range_start: i32,
/// Kafka partition number to end (inclusive) range with
#[clap(env = "INFLUXDB_IOX_KAFKA_PARTITION_RANGE_END")]
pub kafka_partition_range_end: i32,
}
pub async fn command(config: Config) -> Result<()> {
let common_state = CommonServerState::from_config(config.run_config.clone())?;
let catalog: Arc<dyn Catalog> = Arc::new(
PostgresCatalog::connect(
"ingester",
iox_catalog::postgres::SCHEMA_NAME,
&config.catalog_dsn,
)
.await?,
);
let kafka_topic = match catalog
.kafka_topics()
.get_by_name(&config.kafka_topic)
.await?
{
Some(k) => k,
None => return Err(Error::KafkaTopicNotFound(config.kafka_topic)),
};
let kafka_partitions: Vec<_> = (config.kafka_partition_range_start
..config.kafka_partition_range_end)
.map(KafkaPartition::new)
.collect();
let ingest_handler = Arc::new(IngestHandlerImpl::new(
kafka_topic,
kafka_partitions,
catalog,
));
let http = HttpDelegate::new(Arc::clone(&ingest_handler));
let grpc = GrpcDelegate::new(ingest_handler);
let ingester = IngesterServer::new(http, grpc);
let server_type = Arc::new(IngesterServerType::new(ingester, &common_state));
info!("starting ingester");
Ok(influxdb_ioxd::main(common_state, server_type).await?)
}

View File

@ -3,6 +3,7 @@ use snafu::{ResultExt, Snafu};
use crate::clap_blocks::run_config::RunConfig;
pub mod database;
pub mod ingester;
pub mod router;
pub mod router2;
pub mod test;
@ -19,6 +20,9 @@ pub enum Error {
#[snafu(display("Error in router2 subcommand: {}", source))]
Router2Error { source: router2::Error },
#[snafu(display("Error in ingester subcommand: {}", source))]
IngesterError { source: ingester::Error },
#[snafu(display("Error in test subcommand: {}", source))]
TestError { source: test::Error },
}
@ -43,6 +47,7 @@ impl Config {
Some(Command::Database(config)) => &config.run_config,
Some(Command::Router(config)) => &config.run_config,
Some(Command::Router2(config)) => &config.run_config,
Some(Command::Ingester(config)) => &config.run_config,
Some(Command::Test(config)) => &config.run_config,
}
}
@ -59,6 +64,9 @@ enum Command {
/// Run the server in router2 mode
Router2(router2::Config),
/// Run the server in ingester mode
Ingester(ingester::Config),
/// Run the server in test mode
Test(test::Config),
}
@ -76,6 +84,7 @@ pub async fn command(config: Config) -> Result<()> {
Some(Command::Database(config)) => database::command(config).await.context(DatabaseSnafu),
Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu),
Some(Command::Router2(config)) => router2::command(config).await.context(Router2Snafu),
Some(Command::Ingester(config)) => ingester::command(config).await.context(IngesterSnafu),
Some(Command::Test(config)) => test::command(config).await.context(TestSnafu),
}
}

View File

@ -0,0 +1,100 @@
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use async_trait::async_trait;
use hyper::{Body, Request, Response};
use ingester::server::IngesterServer;
use metric::Registry;
use tokio_util::sync::CancellationToken;
use trace::TraceCollector;
use crate::influxdb_ioxd::{
http::error::{HttpApiError, HttpApiErrorSource},
rpc::RpcBuilderInput,
server_type::{common_state::CommonServerState, RpcError, ServerType},
};
use ingester::handler::IngestHandler;
#[derive(Debug)]
pub struct IngesterServerType<I: IngestHandler> {
server: IngesterServer<I>,
shutdown: CancellationToken,
trace_collector: Option<Arc<dyn TraceCollector>>,
}
impl<I: IngestHandler> IngesterServerType<I> {
pub fn new(server: IngesterServer<I>, common_state: &CommonServerState) -> Self {
Self {
server,
shutdown: CancellationToken::new(),
trace_collector: common_state.trace_collector(),
}
}
}
#[async_trait]
impl<I: IngestHandler + Sync + Send + Debug + 'static> ServerType for IngesterServerType<I> {
type RouteError = IoxHttpErrorAdaptor;
/// Return the [`metric::Registry`] used by the router.
fn metric_registry(&self) -> Arc<Registry> {
self.server.metric_registry()
}
/// Returns the trace collector for router traces.
fn trace_collector(&self) -> Option<Arc<dyn TraceCollector>> {
self.trace_collector.as_ref().map(Arc::clone)
}
/// Dispatches `req` to the router [`HttpDelegate`] delegate.
///
/// [`HttpDelegate`]: router2::server::http::HttpDelegate
async fn route_http_request(
&self,
_req: Request<Body>,
) -> Result<Response<Body>, Self::RouteError> {
unimplemented!();
}
/// Registers the services exposed by the router [`GrpcDelegate`] delegate.
///
/// [`GrpcDelegate`]: router2::server::grpc::GrpcDelegate
async fn server_grpc(self: Arc<Self>, _builder_input: RpcBuilderInput) -> Result<(), RpcError> {
unimplemented!()
// let builder = setup_builder!(builder_input, self);
// add_service!(builder, self.server.grpc().write_service());
// serve_builder!(builder);
//
// Ok(())
}
async fn join(self: Arc<Self>) {
self.shutdown.cancelled().await;
}
fn shutdown(&self) {
self.shutdown.cancel();
}
}
/// This adaptor converts the `ingester` http error type into a type that
/// satisfies the requirements of influxdb_ioxd's runner framework, keeping the
/// two decoupled.
#[derive(Debug)]
pub struct IoxHttpErrorAdaptor(router2::server::http::Error);
impl Display for IoxHttpErrorAdaptor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Display::fmt(&self.0, f)
}
}
impl std::error::Error for IoxHttpErrorAdaptor {}
impl HttpApiErrorSource for IoxHttpErrorAdaptor {
fn to_http_api_error(&self) -> HttpApiError {
HttpApiError::new(self.0.as_status_code(), self.to_string())
}
}

View File

@ -10,6 +10,7 @@ use crate::influxdb_ioxd::{http::error::HttpApiErrorSource, rpc::RpcBuilderInput
pub mod common_state;
pub mod database;
pub mod ingester;
pub mod router;
pub mod router2;
pub mod test;

View File

@ -177,7 +177,7 @@ mod tests {
check_response("delete", response, StatusCode::NO_CONTENT, Some("")).await;
let predicate = DeletePredicate {
range: TimestampRange { start: 1, end: 2 },
range: TimestampRange::new(1, 2),
exprs: vec![DeleteExpr {
column: String::from("foo"),
op: data_types::delete_predicate::Op::Eq,

View File

@ -66,10 +66,7 @@ async fn test_delete_on_database() {
// Delete some data
let table = "cpu";
let pred = DeletePredicate {
range: TimestampRange {
start: 100,
end: 120,
},
range: TimestampRange::new(100, 120),
exprs: vec![DeleteExpr {
column: String::from("region"),
op: data_types::delete_predicate::Op::Eq,
@ -169,10 +166,7 @@ pub async fn test_delete_on_router() {
let table = "cpu";
let pred = DeletePredicate {
range: TimestampRange {
start: 100,
end: 120,
},
range: TimestampRange::new(100, 120),
exprs: vec![DeleteExpr {
column: String::from("region"),
op: data_types::delete_predicate::Op::Eq,

View File

@ -9,7 +9,9 @@ arrow = { version = "7.0", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" }
datafusion = { path = "../datafusion" }
data_types = { path = "../data_types" }
hyper = "0.14"
iox_catalog = { path = "../iox_catalog" }
metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch"}
parking_lot = "0.11.2"
predicate = { path = "../predicate" }
@ -17,5 +19,6 @@ query = { path = "../query" }
schema = { path = "../schema" }
snafu = "0.7"
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
thiserror = "1.0"
uuid = { version = "0.8", features = ["v4"] }
workspace-hack = { path = "../workspace-hack"}

View File

@ -6,10 +6,10 @@ use data_types::delete_predicate::DeletePredicate;
use std::{collections::BTreeMap, sync::Arc};
use uuid::Uuid;
use crate::server::IngesterServer;
use crate::handler::IngestHandlerImpl;
use iox_catalog::interface::{
KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, RepoCollection, SequenceNumber,
SequencerId, TableId, Tombstone,
KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId,
Tombstone,
};
use mutable_batch::MutableBatch;
use parking_lot::RwLock;
@ -55,11 +55,9 @@ pub struct Sequencers {
impl Sequencers {
/// One time initialize Sequencers of this Ingester
pub async fn initialize<T: RepoCollection + Send + Sync>(
ingester: &IngesterServer<'_, T>,
) -> Result<Self> {
pub async fn initialize(ingester: &IngestHandlerImpl) -> Result<Self> {
// Get sequencer ids from the catalog
let sequencer_repro = ingester.iox_catalog.sequencer();
let sequencer_repro = ingester.iox_catalog.sequencers();
let mut sequencers = BTreeMap::default();
let topic = ingester.get_topic();
for shard in ingester.get_kafka_partitions() {

62
ingester/src/handler.rs Normal file
View File

@ -0,0 +1,62 @@
//! Ingest handler
use std::sync::Arc;
use iox_catalog::interface::{Catalog, KafkaPartition, KafkaTopic, KafkaTopicId};
use std::fmt::Formatter;
/// The [`IngestHandler`] handles all ingest from kafka, persistence and queries
pub trait IngestHandler {}
/// Implementation of the `IngestHandler` trait to ingest from kafka and manage persistence and answer queries
pub struct IngestHandlerImpl {
/// Kafka Topic assigned to this ingester
kafka_topic: KafkaTopic,
/// Kafka Partitions (Shards) assigned to this INgester
kafka_partitions: Vec<KafkaPartition>,
/// Catalog of this ingester
pub iox_catalog: Arc<dyn Catalog>,
}
impl std::fmt::Debug for IngestHandlerImpl {
fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
todo!()
}
}
impl IngestHandlerImpl {
/// Initialize the Ingester
pub fn new(
topic: KafkaTopic,
shard_ids: Vec<KafkaPartition>,
catalog: Arc<dyn Catalog>,
) -> Self {
Self {
kafka_topic: topic,
kafka_partitions: shard_ids,
iox_catalog: catalog,
}
}
/// Return a kafka topic
pub fn get_topic(&self) -> KafkaTopic {
self.kafka_topic.clone()
}
/// Return a kafka topic id
pub fn get_topic_id(&self) -> KafkaTopicId {
self.kafka_topic.id
}
/// Return a kafka topic name
pub fn get_topic_name(&self) -> String {
self.kafka_topic.name.clone()
}
/// Return Kafka Partitions
pub fn get_kafka_partitions(&self) -> Vec<KafkaPartition> {
self.kafka_partitions.clone()
}
}
impl IngestHandler for IngestHandlerImpl {}

View File

@ -16,4 +16,5 @@
pub mod compact;
pub mod data;
pub mod query;
pub mod handler;
pub mod server;

View File

@ -1,54 +1,49 @@
//! Ingester Server
//!
//! Ingester server entrypoint.
use std::sync::Arc;
use iox_catalog::interface::{KafkaPartition, KafkaTopic, KafkaTopicId, RepoCollection};
use self::{grpc::GrpcDelegate, http::HttpDelegate};
use crate::handler::IngestHandler;
use std::fmt::Debug;
/// The [`IngesterServer`] manages the lifecycle and contains all state for
/// an `ingester` server instance.
pub struct IngesterServer<'a, T>
where
T: RepoCollection + Send + Sync,
{
/// Kafka Topic assigned to this ingester
kafka_topic: KafkaTopic,
/// Kafka Partitions (Shards) assigned to this INgester
kafka_partitions: Vec<KafkaPartition>,
/// Catalog of this ingester
pub iox_catalog: &'a Arc<T>,
pub mod grpc;
pub mod http;
/// The [`IngesterServer`] manages the lifecycle and contains all state for a
/// `ingester` server instance.
#[derive(Debug, Default)]
pub struct IngesterServer<I: IngestHandler> {
metrics: Arc<metric::Registry>,
http: HttpDelegate<I>,
grpc: GrpcDelegate<I>,
}
impl<'a, T> IngesterServer<'a, T>
where
T: RepoCollection + Send + Sync,
{
/// Initialize the Ingester
pub fn new(topic: KafkaTopic, shard_ids: Vec<KafkaPartition>, catalog: &'a Arc<T>) -> Self {
impl<I: IngestHandler> IngesterServer<I> {
/// Initialise a new [`IngesterServer`] using the provided HTTP and gRPC
/// handlers.
pub fn new(http: HttpDelegate<I>, grpc: GrpcDelegate<I>) -> Self {
Self {
kafka_topic: topic,
kafka_partitions: shard_ids,
iox_catalog: catalog,
metrics: Default::default(),
http,
grpc,
}
}
/// Return a kafka topic
pub fn get_topic(&self) -> KafkaTopic {
self.kafka_topic.clone()
}
/// Return a kafka topic id
pub fn get_topic_id(&self) -> KafkaTopicId {
self.kafka_topic.id
}
/// Return a kafka topic name
pub fn get_topic_name(&self) -> String {
self.kafka_topic.name.clone()
}
/// Return Kafka Partitions
pub fn get_kafka_partitions(&self) -> Vec<KafkaPartition> {
self.kafka_partitions.clone()
/// Return the [`metric::Registry`] used by the router.
pub fn metric_registry(&self) -> Arc<metric::Registry> {
Arc::clone(&self.metrics)
}
}
impl<I: IngestHandler + Debug> IngesterServer<I> {
/// Get a reference to the router http delegate.
pub fn http(&self) -> &HttpDelegate<I> {
&self.http
}
/// Get a reference to the router grpc delegate.
pub fn grpc(&self) -> &GrpcDelegate<I> {
&self.grpc
}
}

View File

@ -0,0 +1,20 @@
//! gRPC service implementations for `ingester`.
use crate::handler::IngestHandler;
use std::sync::Arc;
/// This type is responsible for managing all gRPC services exposed by
/// `ingester`.
#[derive(Debug, Default)]
pub struct GrpcDelegate<I: IngestHandler> {
#[allow(dead_code)]
ingest_handler: Arc<I>,
}
impl<I: IngestHandler> GrpcDelegate<I> {
/// Initialise a new [`GrpcDelegate`] passing valid requests to the
/// specified `ingest_handler`.
pub fn new(ingest_handler: Arc<I>) -> Self {
Self { ingest_handler }
}
}

View File

@ -0,0 +1,50 @@
//! HTTP service implementations for `ingester`.
use crate::handler::IngestHandler;
use hyper::{Body, Request, Response, StatusCode};
use std::sync::Arc;
use thiserror::Error;
/// Errors returned by the `router2` HTTP request handler.
#[derive(Debug, Error, Copy, Clone)]
pub enum Error {
/// The requested path has no registered handler.
#[error("not found")]
NotFound,
}
impl Error {
/// Convert the error into an appropriate [`StatusCode`] to be returned to
/// the end user.
pub fn as_status_code(&self) -> StatusCode {
match self {
Error::NotFound => StatusCode::NOT_FOUND,
}
}
}
/// This type is responsible for servicing requests to the `ingester` HTTP
/// endpoint.
///
/// Requests to some paths may be handled externally by the caller - the IOx
/// server runner framework takes care of implementing the heath endpoint,
/// metrics, pprof, etc.
#[derive(Debug, Default)]
pub struct HttpDelegate<I: IngestHandler> {
#[allow(dead_code)]
ingest_handler: Arc<I>,
}
impl<I: IngestHandler> HttpDelegate<I> {
/// Initialise a new [`HttpDelegate`] passing valid requests to the
/// specified `ingest_handler`.
pub fn new(ingest_handler: Arc<I>) -> Self {
Self { ingest_handler }
}
/// Routes `req` to the appropriate handler, if any, returning the handler
/// response.
pub fn route(&self, _req: Request<Body>) -> Result<Response<Body>, Error> {
unimplemented!()
}
}

View File

@ -6,7 +6,6 @@ use snafu::{OptionExt, Snafu};
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::fmt::Formatter;
use std::sync::Arc;
use uuid::Uuid;
#[derive(Debug, Snafu)]
@ -247,32 +246,32 @@ impl std::fmt::Display for ParquetFileId {
}
}
/// Container that can return repos for each of the catalog data types.
/// Trait that contains methods for working with the catalog
#[async_trait]
pub trait RepoCollection {
pub trait Catalog: Send + Sync {
/// repo for kafka topics
fn kafka_topic(&self) -> Arc<dyn KafkaTopicRepo + Sync + Send>;
fn kafka_topics(&self) -> &dyn KafkaTopicRepo;
/// repo fo rquery pools
fn query_pool(&self) -> Arc<dyn QueryPoolRepo + Sync + Send>;
fn query_pools(&self) -> &dyn QueryPoolRepo;
/// repo for namespaces
fn namespace(&self) -> Arc<dyn NamespaceRepo + Sync + Send>;
fn namespaces(&self) -> &dyn NamespaceRepo;
/// repo for tables
fn table(&self) -> Arc<dyn TableRepo + Sync + Send>;
fn tables(&self) -> &dyn TableRepo;
/// repo for columns
fn column(&self) -> Arc<dyn ColumnRepo + Sync + Send>;
fn columns(&self) -> &dyn ColumnRepo;
/// repo for sequencers
fn sequencer(&self) -> Arc<dyn SequencerRepo + Sync + Send>;
fn sequencers(&self) -> &dyn SequencerRepo;
/// repo for partitions
fn partition(&self) -> Arc<dyn PartitionRepo + Sync + Send>;
fn partitions(&self) -> &dyn PartitionRepo;
/// repo for tombstones
fn tombstone(&self) -> Arc<dyn TombstoneRepo + Sync + Send>;
fn tombstones(&self) -> &dyn TombstoneRepo;
/// repo for parquet_files
fn parquet_file(&self) -> Arc<dyn ParquetFileRepo + Sync + Send>;
fn parquet_files(&self) -> &dyn ParquetFileRepo;
}
/// Functions for working with Kafka topics in the catalog.
#[async_trait]
pub trait KafkaTopicRepo {
pub trait KafkaTopicRepo: Send + Sync {
/// Creates the kafka topic in the catalog or gets the existing record by name.
async fn create_or_get(&self, name: &str) -> Result<KafkaTopic>;
@ -282,14 +281,14 @@ pub trait KafkaTopicRepo {
/// Functions for working with query pools in the catalog.
#[async_trait]
pub trait QueryPoolRepo {
pub trait QueryPoolRepo: Send + Sync {
/// Creates the query pool in the catalog or gets the existing record by name.
async fn create_or_get(&self, name: &str) -> Result<QueryPool>;
}
/// Functions for working with namespaces in the catalog
#[async_trait]
pub trait NamespaceRepo {
pub trait NamespaceRepo: Send + Sync {
/// Creates the namespace in the catalog. If one by the same name already exists, an
/// error is returned.
async fn create(
@ -306,7 +305,7 @@ pub trait NamespaceRepo {
/// Functions for working with tables in the catalog
#[async_trait]
pub trait TableRepo {
pub trait TableRepo: Send + Sync {
/// Creates the table in the catalog or get the existing record by name.
async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result<Table>;
@ -316,7 +315,7 @@ pub trait TableRepo {
/// Functions for working with columns in the catalog
#[async_trait]
pub trait ColumnRepo {
pub trait ColumnRepo: Send + Sync {
/// Creates the column in the catalog or returns the existing column. Will return a
/// `Error::ColumnTypeMismatch` if the existing column type doesn't match the type
/// the caller is attempting to create.
@ -333,7 +332,7 @@ pub trait ColumnRepo {
/// Functions for working with sequencers in the catalog
#[async_trait]
pub trait SequencerRepo {
pub trait SequencerRepo: Send + Sync {
/// create a sequencer record for the kafka topic and partition or return the existing record
async fn create_or_get(
&self,
@ -358,7 +357,7 @@ pub trait SequencerRepo {
/// Functions for working with IOx partitions in the catalog. Note that these are how
/// IOx splits up data within a database, which is differenet than Kafka partitions.
#[async_trait]
pub trait PartitionRepo {
pub trait PartitionRepo: Send + Sync {
/// create or get a partition record for the given partition key, sequencer and table
async fn create_or_get(
&self,
@ -373,7 +372,7 @@ pub trait PartitionRepo {
/// Functions for working with tombstones in the catalog
#[async_trait]
pub trait TombstoneRepo {
pub trait TombstoneRepo: Send + Sync {
/// create or get a tombstone
async fn create_or_get(
&self,
@ -397,7 +396,7 @@ pub trait TombstoneRepo {
/// Functions for working with parquet file pointers in the catalog
#[async_trait]
pub trait ParquetFileRepo {
pub trait ParquetFileRepo: Send + Sync {
/// create the parquet file
#[allow(clippy::too_many_arguments)]
async fn create(
@ -519,22 +518,19 @@ impl NamespaceSchema {
}
/// Gets the namespace schema including all tables and columns.
pub async fn get_schema_by_name<T: RepoCollection + Send + Sync>(
pub async fn get_schema_by_name(
name: &str,
repo: &T,
catalog: &dyn Catalog,
) -> Result<Option<NamespaceSchema>> {
let namespace_repo = repo.namespace();
let table_repo = repo.table();
let column_repo = repo.column();
let namespace = namespace_repo
let namespace = catalog
.namespaces()
.get_by_name(name)
.await?
.context(NamespaceNotFoundSnafu { name })?;
// get the columns first just in case someone else is creating schema while we're doing this.
let columns = column_repo.list_by_namespace_id(namespace.id).await?;
let tables = table_repo.list_by_namespace_id(namespace.id).await?;
let columns = catalog.columns().list_by_namespace_id(namespace.id).await?;
let tables = catalog.tables().list_by_namespace_id(namespace.id).await?;
let mut namespace = NamespaceSchema::new(
namespace.id,
@ -813,25 +809,22 @@ pub struct ParquetFile {
pub(crate) mod test_helpers {
use super::*;
use futures::{stream::FuturesOrdered, StreamExt};
use std::sync::Arc;
pub(crate) async fn test_repo<T, F>(new_repo: F)
where
T: RepoCollection + Send + Sync,
F: Fn() -> T + Send + Sync,
{
test_kafka_topic(&new_repo()).await;
test_query_pool(&new_repo()).await;
test_namespace(&new_repo()).await;
test_table(&new_repo()).await;
test_column(&new_repo()).await;
test_sequencer(&new_repo()).await;
test_partition(&new_repo()).await;
test_tombstone(&new_repo()).await;
test_parquet_file(&new_repo()).await;
pub(crate) async fn test_catalog(catalog: Arc<dyn Catalog>) {
test_kafka_topic(Arc::clone(&catalog)).await;
test_query_pool(Arc::clone(&catalog)).await;
test_namespace(Arc::clone(&catalog)).await;
test_table(Arc::clone(&catalog)).await;
test_column(Arc::clone(&catalog)).await;
test_sequencer(Arc::clone(&catalog)).await;
test_partition(Arc::clone(&catalog)).await;
test_tombstone(Arc::clone(&catalog)).await;
test_parquet_file(Arc::clone(&catalog)).await;
}
async fn test_kafka_topic<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka_repo = repo.kafka_topic();
async fn test_kafka_topic(catalog: Arc<dyn Catalog>) {
let kafka_repo = catalog.kafka_topics();
let k = kafka_repo.create_or_get("foo").await.unwrap();
assert!(k.id > KafkaTopicId::new(0));
assert_eq!(k.name, "foo");
@ -843,8 +836,8 @@ pub(crate) mod test_helpers {
assert!(k3.is_none());
}
async fn test_query_pool<T: RepoCollection + Send + Sync>(repo: &T) {
let query_repo = repo.query_pool();
async fn test_query_pool(catalog: Arc<dyn Catalog>) {
let query_repo = catalog.query_pools();
let q = query_repo.create_or_get("foo").await.unwrap();
assert!(q.id > QueryPoolId::new(0));
assert_eq!(q.name, "foo");
@ -852,10 +845,10 @@ pub(crate) mod test_helpers {
assert_eq!(q, q2);
}
async fn test_namespace<T: RepoCollection + Send + Sync>(repo: &T) {
let namespace_repo = repo.namespace();
let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap();
let pool = repo.query_pool().create_or_get("foo").await.unwrap();
async fn test_namespace(catalog: Arc<dyn Catalog>) {
let namespace_repo = catalog.namespaces();
let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap();
let pool = catalog.query_pools().create_or_get("foo").await.unwrap();
let namespace_name = "test_namespace";
let namespace = namespace_repo
@ -881,53 +874,75 @@ pub(crate) mod test_helpers {
assert_eq!(namespace, found);
}
async fn test_table<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap();
let pool = repo.query_pool().create_or_get("foo").await.unwrap();
let namespace = repo
.namespace()
async fn test_table(catalog: Arc<dyn Catalog>) {
let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap();
let pool = catalog.query_pools().create_or_get("foo").await.unwrap();
let namespace = catalog
.namespaces()
.create("namespace_table_test", "inf", kafka.id, pool.id)
.await
.unwrap();
// test we can create or get a table
let table_repo = repo.table();
let t = table_repo
let t = catalog
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let tt = table_repo
let tt = catalog
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
assert!(t.id > TableId::new(0));
assert_eq!(t, tt);
let tables = table_repo.list_by_namespace_id(namespace.id).await.unwrap();
let tables = catalog
.tables()
.list_by_namespace_id(namespace.id)
.await
.unwrap();
assert_eq!(vec![t], tables);
// test we can create a table of the same name in a different namespace
let namespace2 = catalog
.namespaces()
.create("two", "inf", kafka.id, pool.id)
.await
.unwrap();
assert_ne!(namespace, namespace2);
let test_table = catalog
.tables()
.create_or_get("test_table", namespace2.id)
.await
.unwrap();
assert_ne!(tt, test_table);
assert_eq!(test_table.namespace_id, namespace2.id)
}
async fn test_column<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap();
let pool = repo.query_pool().create_or_get("foo").await.unwrap();
let namespace = repo
.namespace()
async fn test_column(catalog: Arc<dyn Catalog>) {
let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap();
let pool = catalog.query_pools().create_or_get("foo").await.unwrap();
let namespace = catalog
.namespaces()
.create("namespace_column_test", "inf", kafka.id, pool.id)
.await
.unwrap();
let table = repo
.table()
let table = catalog
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
assert_eq!(table.namespace_id, namespace.id);
// test we can create or get a column
let column_repo = repo.column();
let c = column_repo
let c = catalog
.columns()
.create_or_get("column_test", table.id, ColumnType::Tag)
.await
.unwrap();
let cc = column_repo
let cc = catalog
.columns()
.create_or_get("column_test", table.id, ColumnType::Tag)
.await
.unwrap();
@ -935,7 +950,8 @@ pub(crate) mod test_helpers {
assert_eq!(c, cc);
// test that attempting to create an already defined column of a different type returns error
let err = column_repo
let err = catalog
.columns()
.create_or_get("column_test", table.id, ColumnType::U64)
.await
.expect_err("should error with wrong column type");
@ -949,35 +965,40 @@ pub(crate) mod test_helpers {
));
// test that we can create a column of the same name under a different table
let table2 = repo
.table()
let table2 = catalog
.tables()
.create_or_get("test_table_2", namespace.id)
.await
.unwrap();
let ccc = column_repo
let ccc = catalog
.columns()
.create_or_get("column_test", table2.id, ColumnType::U64)
.await
.unwrap();
assert_ne!(c, ccc);
let columns = column_repo
let columns = catalog
.columns()
.list_by_namespace_id(namespace.id)
.await
.unwrap();
assert_eq!(vec![c, ccc], columns);
}
async fn test_sequencer<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka = repo
.kafka_topic()
async fn test_sequencer(catalog: Arc<dyn Catalog>) {
let kafka = catalog
.kafka_topics()
.create_or_get("sequencer_test")
.await
.unwrap();
let sequencer_repo = repo.sequencer();
// Create 10 sequencers
let created = (1..=10)
.map(|partition| sequencer_repo.create_or_get(&kafka, KafkaPartition::new(partition)))
.map(|partition| {
catalog
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(partition))
})
.collect::<FuturesOrdered<_>>()
.map(|v| {
let v = v.expect("failed to create sequencer");
@ -987,7 +1008,8 @@ pub(crate) mod test_helpers {
.await;
// List them and assert they match
let listed = sequencer_repo
let listed = catalog
.sequencers()
.list_by_kafka_topic(&kafka)
.await
.expect("failed to list sequencers")
@ -999,7 +1021,8 @@ pub(crate) mod test_helpers {
// get by the sequencer id and partition
let kafka_partition = KafkaPartition::new(1);
let sequencer = sequencer_repo
let sequencer = catalog
.sequencers()
.get_by_topic_id_and_partition(kafka.id, kafka_partition)
.await
.unwrap()
@ -1007,42 +1030,45 @@ pub(crate) mod test_helpers {
assert_eq!(kafka.id, sequencer.kafka_topic_id);
assert_eq!(kafka_partition, sequencer.kafka_partition);
let sequencer = sequencer_repo
let sequencer = catalog
.sequencers()
.get_by_topic_id_and_partition(kafka.id, KafkaPartition::new(523))
.await
.unwrap();
assert!(sequencer.is_none());
}
async fn test_partition<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap();
let pool = repo.query_pool().create_or_get("foo").await.unwrap();
let namespace = repo
.namespace()
async fn test_partition(catalog: Arc<dyn Catalog>) {
let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap();
let pool = catalog.query_pools().create_or_get("foo").await.unwrap();
let namespace = catalog
.namespaces()
.create("namespace_partition_test", "inf", kafka.id, pool.id)
.await
.unwrap();
let table = repo
.table()
let table = catalog
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let sequencer = repo
.sequencer()
let sequencer = catalog
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(1))
.await
.unwrap();
let other_sequencer = repo
.sequencer()
let other_sequencer = catalog
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(2))
.await
.unwrap();
let partition_repo = repo.partition();
let created = ["foo", "bar"]
.iter()
.map(|key| partition_repo.create_or_get(key, sequencer.id, table.id))
.map(|key| {
catalog
.partitions()
.create_or_get(key, sequencer.id, table.id)
})
.collect::<FuturesOrdered<_>>()
.map(|v| {
let v = v.expect("failed to create partition");
@ -1050,13 +1076,15 @@ pub(crate) mod test_helpers {
})
.collect::<BTreeMap<_, _>>()
.await;
let _ = partition_repo
let _ = catalog
.partitions()
.create_or_get("asdf", other_sequencer.id, table.id)
.await
.unwrap();
// List them and assert they match
let listed = partition_repo
let listed = catalog
.partitions()
.list_by_sequencer(sequencer.id)
.await
.expect("failed to list partitions")
@ -1067,34 +1095,34 @@ pub(crate) mod test_helpers {
assert_eq!(created, listed);
}
async fn test_tombstone<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap();
let pool = repo.query_pool().create_or_get("foo").await.unwrap();
let namespace = repo
.namespace()
async fn test_tombstone(catalog: Arc<dyn Catalog>) {
let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap();
let pool = catalog.query_pools().create_or_get("foo").await.unwrap();
let namespace = catalog
.namespaces()
.create("namespace_tombstone_test", "inf", kafka.id, pool.id)
.await
.unwrap();
let table = repo
.table()
let table = catalog
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let other_table = repo
.table()
let other_table = catalog
.tables()
.create_or_get("other", namespace.id)
.await
.unwrap();
let sequencer = repo
.sequencer()
let sequencer = catalog
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(1))
.await
.unwrap();
let tombstone_repo = repo.tombstone();
let min_time = Timestamp::new(1);
let max_time = Timestamp::new(10);
let t1 = tombstone_repo
let t1 = catalog
.tombstones()
.create_or_get(
table.id,
sequencer.id,
@ -1111,7 +1139,8 @@ pub(crate) mod test_helpers {
assert_eq!(t1.min_time, min_time);
assert_eq!(t1.max_time, max_time);
assert_eq!(t1.serialized_predicate, "whatevs");
let t2 = tombstone_repo
let t2 = catalog
.tombstones()
.create_or_get(
other_table.id,
sequencer.id,
@ -1122,7 +1151,8 @@ pub(crate) mod test_helpers {
)
.await
.unwrap();
let t3 = tombstone_repo
let t3 = catalog
.tombstones()
.create_or_get(
table.id,
sequencer.id,
@ -1134,43 +1164,44 @@ pub(crate) mod test_helpers {
.await
.unwrap();
let listed = tombstone_repo
let listed = catalog
.tombstones()
.list_tombstones_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1))
.await
.unwrap();
assert_eq!(vec![t2, t3], listed);
}
async fn test_parquet_file<T: RepoCollection + Send + Sync>(repo: &T) {
let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap();
let pool = repo.query_pool().create_or_get("foo").await.unwrap();
let namespace = repo
.namespace()
async fn test_parquet_file(catalog: Arc<dyn Catalog>) {
let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap();
let pool = catalog.query_pools().create_or_get("foo").await.unwrap();
let namespace = catalog
.namespaces()
.create("namespace_parquet_file_test", "inf", kafka.id, pool.id)
.await
.unwrap();
let table = repo
.table()
let table = catalog
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let other_table = repo
.table()
let other_table = catalog
.tables()
.create_or_get("other", namespace.id)
.await
.unwrap();
let sequencer = repo
.sequencer()
let sequencer = catalog
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(1))
.await
.unwrap();
let partition = repo
.partition()
let partition = catalog
.partitions()
.create_or_get("one", sequencer.id, table.id)
.await
.unwrap();
let other_partition = repo
.partition()
let other_partition = catalog
.partitions()
.create_or_get("one", sequencer.id, other_table.id)
.await
.unwrap();
@ -1178,7 +1209,7 @@ pub(crate) mod test_helpers {
let min_time = Timestamp::new(1);
let max_time = Timestamp::new(10);
let parquet_repo = repo.parquet_file();
let parquet_repo = catalog.parquet_files();
let parquet_file = parquet_repo
.create(
sequencer.id,

View File

@ -12,8 +12,8 @@
)]
use crate::interface::{
column_type_from_field, ColumnSchema, ColumnType, Error, KafkaPartition, KafkaTopic,
NamespaceSchema, QueryPool, RepoCollection, Result, Sequencer, SequencerId, TableId,
column_type_from_field, Catalog, ColumnSchema, ColumnType, Error, KafkaPartition, KafkaTopic,
NamespaceSchema, QueryPool, Result, Sequencer, SequencerId, TableId,
};
use futures::{stream::FuturesOrdered, StreamExt};
use influxdb_line_protocol::ParsedLine;
@ -36,10 +36,10 @@ pub mod postgres;
/// If another writer attempts to create a column of the same name with a different
/// type at the same time and beats this caller to it, an error will be returned. If another
/// writer adds the same schema before this one, then this will load that schema here.
pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
pub async fn validate_or_insert_schema(
lines: Vec<ParsedLine<'_>>,
schema: &NamespaceSchema,
repo: &T,
catalog: &dyn Catalog,
) -> Result<Option<NamespaceSchema>> {
// table name to table_id
let mut new_tables: BTreeMap<String, TableId> = BTreeMap::new();
@ -66,8 +66,8 @@ pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
None => {
let entry = new_columns.entry(table.id).or_default();
if entry.get(key.as_str()).is_none() {
let column_repo = repo.column();
let column = column_repo
let column = catalog
.columns()
.create_or_get(key.as_str(), table.id, ColumnType::Tag)
.await?;
entry.insert(
@ -97,8 +97,8 @@ pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
let entry = new_columns.entry(table.id).or_default();
if entry.get(key.as_str()).is_none() {
let data_type = column_type_from_field(value);
let column_repo = repo.column();
let column = column_repo
let column = catalog
.columns()
.create_or_get(key.as_str(), table.id, data_type)
.await?;
entry.insert(
@ -113,15 +113,16 @@ pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
}
}
None => {
let table_repo = repo.table();
let new_table = table_repo.create_or_get(table_name, schema.id).await?;
let new_table = catalog
.tables()
.create_or_get(table_name, schema.id)
.await?;
let new_table_columns = new_columns.entry(new_table.id).or_default();
let column_repo = repo.column();
if let Some(tagset) = &line.series.tag_set {
for (key, _) in tagset {
let new_column = column_repo
let new_column = catalog
.columns()
.create_or_get(key.as_str(), new_table.id, ColumnType::Tag)
.await?;
new_table_columns.insert(
@ -135,7 +136,8 @@ pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
}
for (key, value) in &line.field_set {
let data_type = column_type_from_field(value);
let new_column = column_repo
let new_column = catalog
.columns()
.create_or_get(key.as_str(), new_table.id, data_type)
.await?;
new_table_columns.insert(
@ -146,7 +148,8 @@ pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
},
);
}
let time_column = column_repo
let time_column = catalog
.columns()
.create_or_get(TIME_COLUMN, new_table.id, ColumnType::Time)
.await?;
new_table_columns.insert(
@ -173,19 +176,25 @@ pub async fn validate_or_insert_schema<T: RepoCollection + Sync + Send>(
/// Creates or gets records in the catalog for the shared kafka topic, query pool, and sequencers for
/// each of the partitions.
pub async fn create_or_get_default_records<T: RepoCollection + Sync + Send>(
pub async fn create_or_get_default_records(
kafka_partition_count: i32,
repo: &T,
catalog: &dyn Catalog,
) -> Result<(KafkaTopic, QueryPool, BTreeMap<SequencerId, Sequencer>)> {
let kafka_repo = repo.kafka_topic();
let query_repo = repo.query_pool();
let sequencer_repo = repo.sequencer();
let kafka_topic = kafka_repo.create_or_get(SHARED_KAFKA_TOPIC).await?;
let query_pool = query_repo.create_or_get(SHARED_QUERY_POOL).await?;
let kafka_topic = catalog
.kafka_topics()
.create_or_get(SHARED_KAFKA_TOPIC)
.await?;
let query_pool = catalog
.query_pools()
.create_or_get(SHARED_QUERY_POOL)
.await?;
let sequencers = (1..=kafka_partition_count)
.map(|partition| sequencer_repo.create_or_get(&kafka_topic, KafkaPartition::new(partition)))
.map(|partition| {
catalog
.sequencers()
.create_or_get(&kafka_topic, KafkaPartition::new(partition))
})
.collect::<FuturesOrdered<_>>()
.map(|v| {
let v = v.expect("failed to create sequencer");
@ -207,13 +216,13 @@ mod tests {
#[tokio::test]
async fn test_validate_or_insert_schema() {
let repo = Arc::new(MemCatalog::new());
let repo = MemCatalog::new();
let (kafka_topic, query_pool, _) = create_or_get_default_records(2, &repo).await.unwrap();
let namespace_name = "validate_schema";
// now test with a new namespace
let namespace = repo
.namespace()
.namespaces()
.create(namespace_name, "inf", kafka_topic.id, query_pool.id)
.await
.unwrap();

View File

@ -2,16 +2,16 @@
//! used for testing or for an IOx designed to run without catalog persistence.
use crate::interface::{
Column, ColumnId, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId,
KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, ParquetFileId,
ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId, QueryPoolRepo,
RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId,
Catalog, Column, ColumnId, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic,
KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile,
ParquetFileId, ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId,
QueryPoolRepo, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId,
TableRepo, Timestamp, Tombstone, TombstoneId, TombstoneRepo,
};
use async_trait::async_trait;
use std::convert::TryFrom;
use std::fmt::Formatter;
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use uuid::Uuid;
/// In-memory catalog that implements the `RepoCollection` and individual repo traits from
@ -48,41 +48,41 @@ struct MemCollections {
parquet_files: Vec<ParquetFile>,
}
impl RepoCollection for Arc<MemCatalog> {
fn kafka_topic(&self) -> Arc<dyn KafkaTopicRepo + Sync + Send> {
Self::clone(self) as Arc<dyn KafkaTopicRepo + Sync + Send>
impl Catalog for MemCatalog {
fn kafka_topics(&self) -> &dyn KafkaTopicRepo {
self
}
fn query_pool(&self) -> Arc<dyn QueryPoolRepo + Sync + Send> {
Self::clone(self) as Arc<dyn QueryPoolRepo + Sync + Send>
fn query_pools(&self) -> &dyn QueryPoolRepo {
self
}
fn namespace(&self) -> Arc<dyn NamespaceRepo + Sync + Send> {
Self::clone(self) as Arc<dyn NamespaceRepo + Sync + Send>
fn namespaces(&self) -> &dyn NamespaceRepo {
self
}
fn table(&self) -> Arc<dyn TableRepo + Sync + Send> {
Self::clone(self) as Arc<dyn TableRepo + Sync + Send>
fn tables(&self) -> &dyn TableRepo {
self
}
fn column(&self) -> Arc<dyn ColumnRepo + Sync + Send> {
Self::clone(self) as Arc<dyn ColumnRepo + Sync + Send>
fn columns(&self) -> &dyn ColumnRepo {
self
}
fn sequencer(&self) -> Arc<dyn SequencerRepo + Sync + Send> {
Self::clone(self) as Arc<dyn SequencerRepo + Sync + Send>
fn sequencers(&self) -> &dyn SequencerRepo {
self
}
fn partition(&self) -> Arc<dyn PartitionRepo + Sync + Send> {
Self::clone(self) as Arc<dyn PartitionRepo + Sync + Send>
fn partitions(&self) -> &dyn PartitionRepo {
self
}
fn tombstone(&self) -> Arc<dyn TombstoneRepo + Sync + Send> {
Self::clone(self) as Arc<dyn TombstoneRepo + Sync + Send>
fn tombstones(&self) -> &dyn TombstoneRepo {
self
}
fn parquet_file(&self) -> Arc<dyn ParquetFileRepo + Sync + Send> {
Self::clone(self) as Arc<dyn ParquetFileRepo + Sync + Send>
fn parquet_files(&self) -> &dyn ParquetFileRepo {
self
}
}
@ -180,7 +180,11 @@ impl TableRepo for MemCatalog {
async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result<Table> {
let mut collections = self.collections.lock().expect("mutex poisoned");
let table = match collections.tables.iter().find(|t| t.name == name) {
let table = match collections
.tables
.iter()
.find(|t| t.name == name && t.namespace_id == namespace_id)
{
Some(t) => t,
None => {
let table = Table {
@ -250,18 +254,22 @@ impl ColumnRepo for MemCatalog {
}
async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result<Vec<Column>> {
let mut columns = vec![];
let collections = self.collections.lock().expect("mutex poisoned");
for t in collections
let table_ids: Vec<_> = collections
.tables
.iter()
.filter(|t| t.namespace_id == namespace_id)
{
for c in collections.columns.iter().filter(|c| c.table_id == t.id) {
columns.push(c.clone());
}
}
.map(|t| t.id)
.collect();
println!("tables: {:?}", collections.tables);
println!("table_ids: {:?}", table_ids);
let columns: Vec<_> = collections
.columns
.iter()
.filter(|c| table_ids.contains(&c.table_id))
.cloned()
.collect();
Ok(columns)
}
@ -488,11 +496,10 @@ impl ParquetFileRepo for MemCatalog {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
#[tokio::test]
async fn test_mem_repo() {
let f = || Arc::new(MemCatalog::new());
crate::interface::test_helpers::test_repo(f).await;
async fn test_catalog() {
crate::interface::test_helpers::test_catalog(Arc::new(MemCatalog::new())).await;
}
}

View File

@ -1,24 +1,23 @@
//! A Postgres backed implementation of the Catalog
use crate::interface::{
Column, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId,
Catalog, Column, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId,
KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, ParquetFileId,
ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId, QueryPoolRepo,
RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId,
TableRepo, Timestamp, Tombstone, TombstoneRepo,
Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, TableRepo,
Timestamp, Tombstone, TombstoneRepo,
};
use async_trait::async_trait;
use observability_deps::tracing::info;
use sqlx::{postgres::PgPoolOptions, Executor, Pool, Postgres};
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
const MAX_CONNECTIONS: u32 = 5;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
const IDLE_TIMEOUT: Duration = Duration::from_secs(500);
#[allow(dead_code)]
const SCHEMA_NAME: &str = "iox_catalog";
/// the default schema name to use in Postgres
pub const SCHEMA_NAME: &str = "iox_catalog";
/// In-memory catalog that implements the `RepoCollection` and individual repo traits.
#[derive(Debug)]
@ -62,41 +61,41 @@ impl PostgresCatalog {
}
}
impl RepoCollection for Arc<PostgresCatalog> {
fn kafka_topic(&self) -> Arc<dyn KafkaTopicRepo + Sync + Send> {
Self::clone(self) as Arc<dyn KafkaTopicRepo + Sync + Send>
impl Catalog for PostgresCatalog {
fn kafka_topics(&self) -> &dyn KafkaTopicRepo {
self
}
fn query_pool(&self) -> Arc<dyn QueryPoolRepo + Sync + Send> {
Self::clone(self) as Arc<dyn QueryPoolRepo + Sync + Send>
fn query_pools(&self) -> &dyn QueryPoolRepo {
self
}
fn namespace(&self) -> Arc<dyn NamespaceRepo + Sync + Send> {
Self::clone(self) as Arc<dyn NamespaceRepo + Sync + Send>
fn namespaces(&self) -> &dyn NamespaceRepo {
self
}
fn table(&self) -> Arc<dyn TableRepo + Sync + Send> {
Self::clone(self) as Arc<dyn TableRepo + Sync + Send>
fn tables(&self) -> &dyn TableRepo {
self
}
fn column(&self) -> Arc<dyn ColumnRepo + Sync + Send> {
Self::clone(self) as Arc<dyn ColumnRepo + Sync + Send>
fn columns(&self) -> &dyn ColumnRepo {
self
}
fn sequencer(&self) -> Arc<dyn SequencerRepo + Sync + Send> {
Self::clone(self) as Arc<dyn SequencerRepo + Sync + Send>
fn sequencers(&self) -> &dyn SequencerRepo {
self
}
fn partition(&self) -> Arc<dyn PartitionRepo + Sync + Send> {
Self::clone(self) as Arc<dyn PartitionRepo + Sync + Send>
fn partitions(&self) -> &dyn PartitionRepo {
self
}
fn tombstone(&self) -> Arc<dyn TombstoneRepo + Sync + Send> {
Self::clone(self) as Arc<dyn TombstoneRepo + Sync + Send>
fn tombstones(&self) -> &dyn TombstoneRepo {
self
}
fn parquet_file(&self) -> Arc<dyn ParquetFileRepo + Sync + Send> {
Self::clone(self) as Arc<dyn ParquetFileRepo + Sync + Send>
fn parquet_files(&self) -> &dyn ParquetFileRepo {
self
}
}
@ -586,6 +585,7 @@ fn is_fk_violation(e: &sqlx::Error) -> bool {
mod tests {
use super::*;
use std::env;
use std::sync::Arc;
// Helper macro to skip tests if TEST_INTEGRATION and the AWS environment variables are not set.
macro_rules! maybe_skip_integration {
@ -624,17 +624,15 @@ mod tests {
}};
}
async fn setup_db() -> Arc<PostgresCatalog> {
async fn setup_db() -> PostgresCatalog {
let dsn = std::env::var("DATABASE_URL").unwrap();
Arc::new(
PostgresCatalog::connect("test", SCHEMA_NAME, &dsn)
.await
.unwrap(),
)
PostgresCatalog::connect("test", SCHEMA_NAME, &dsn)
.await
.unwrap()
}
#[tokio::test]
async fn test_repo() {
async fn test_catalog() {
// If running an integration test on your laptop, this requires that you have Postgres
// running and that you've done the sqlx migrations. See the README in this crate for
// info to set it up.
@ -642,10 +640,9 @@ mod tests {
let postgres = setup_db().await;
clear_schema(&postgres.pool).await;
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
let f = || Arc::clone(&postgres);
crate::interface::test_helpers::test_repo(f).await;
crate::interface::test_helpers::test_catalog(postgres).await;
}
async fn clear_schema(pool: &Pool<Postgres>) {

View File

@ -545,7 +545,7 @@ fn get_sorted_keys<'a>(
/// Helper to create a simple delete predicate.
pub fn create_delete_predicate(value: i64) -> Arc<DeletePredicate> {
Arc::new(DeletePredicate {
range: TimestampRange { start: 11, end: 22 },
range: TimestampRange::new(11, 22),
exprs: vec![DeleteExpr::new(
"foo".to_string(),
Op::Eq,

View File

@ -13,6 +13,7 @@ schema = { path = "../schema" }
observability_deps = { path = "../observability_deps" }
ordered-float = "2"
regex = "1"
regex-syntax = "0.6.25"
serde_json = "1.0.72"
snafu = "0.7"
sqlparser = "0.13.0"

View File

@ -94,10 +94,7 @@ pub fn parse_delete_predicate(
let delete_exprs = parse_predicate(predicate)?;
Ok(DeletePredicate {
range: TimestampRange {
start: start_time,
end: stop_time,
},
range: TimestampRange::new(start_time, stop_time),
exprs: delete_exprs,
})
}
@ -551,8 +548,8 @@ mod tests {
let pred = r#"cost != 100"#;
let result = parse_delete_predicate(start, stop, pred).unwrap();
assert_eq!(result.range.start, 0);
assert_eq!(result.range.end, 200);
assert_eq!(result.range.start(), 0);
assert_eq!(result.range.end(), 200);
let expected = vec![DeleteExpr::new(
"cost".to_string(),

View File

@ -124,7 +124,7 @@ impl Predicate {
/// `range.start <= time and time < range.end`
fn make_timestamp_predicate_expr(&self) -> Option<Expr> {
self.range
.map(|range| make_range_expr(range.start, range.end, TIME_COLUMN_NAME))
.map(|range| make_range_expr(range.start(), range.end(), TIME_COLUMN_NAME))
}
/// Returns true if ths predicate evaluates to true for all rows
@ -184,8 +184,8 @@ impl Predicate {
// time_expr = NOT(start <= time_range <= end)
// Equivalent to: (time < start OR time > end)
let time_expr = col(TIME_COLUMN_NAME)
.lt(lit_timestamp_nano(range.start))
.or(col(TIME_COLUMN_NAME).gt(lit_timestamp_nano(range.end)));
.lt(lit_timestamp_nano(range.start()))
.or(col(TIME_COLUMN_NAME).gt(lit_timestamp_nano(range.end())));
match expr {
None => expr = Some(time_expr),
@ -215,7 +215,7 @@ impl Predicate {
/// existing storage engine
pub fn clear_timestamp_if_max_range(mut self) -> Self {
self.range = self.range.take().and_then(|range| {
if range.start <= MIN_NANO_TIME && range.end >= MAX_NANO_TIME {
if range.start() <= MIN_NANO_TIME && range.end() >= MAX_NANO_TIME {
None
} else {
Some(range)
@ -254,7 +254,7 @@ impl fmt::Display for Predicate {
if let Some(range) = &self.range {
// TODO: could be nice to show this as actual timestamps (not just numbers)?
write!(f, " range: [{} - {}]", range.start, range.end)?;
write!(f, " range: [{} - {}]", range.start(), range.end())?;
}
if !self.exprs.is_empty() {
@ -313,7 +313,7 @@ impl PredicateBuilder {
"Unexpected re-definition of timestamp range"
);
self.inner.range = Some(TimestampRange { start, end });
self.inner.range = Some(TimestampRange::new(start, end));
self
}

View File

@ -30,6 +30,11 @@ pub fn regex_match_expr(input: Expr, pattern: String, matches: bool) -> Expr {
// N.B., this function does not utilise the Arrow regexp compute kernel because
// in order to act as a filter it needs to return a boolean array of comparison
// results, not an array of strings as the regex compute kernel does.
// Attempt to make the pattern compatible with what is accepted by
// the golang regexp library which is different than Rust's regexp
let pattern = clean_non_meta_escapes(pattern);
let func = move |args: &[ArrayRef]| {
assert_eq!(args.len(), 1); // only works over a single column at a time.
@ -72,6 +77,79 @@ pub fn regex_match_expr(input: Expr, pattern: String, matches: bool) -> Expr {
udf.call(vec![input])
}
fn is_valid_character_after_escape(c: char) -> bool {
// same list as https://docs.rs/regex-syntax/0.6.25/src/regex_syntax/ast/parse.rs.html#1445-1538
match c {
'0'..='7' => true,
'8'..='9' => true,
'x' | 'u' | 'U' => true,
'p' | 'P' => true,
'd' | 's' | 'w' | 'D' | 'S' | 'W' => true,
_ => regex_syntax::is_meta_character(c),
}
}
/// Removes all `/` patterns that the rust regex library would reject
/// and rewrites them to their unescaped form.
///
/// For example, `\:` is rewritten to `:` as `\:` is not a valid
/// escape sequence in the `regexp` crate but is valid in golang's
/// regexp implementation.
///
/// This is done for compatibility purposes so that the regular
/// expression matching in Rust more closely follows the matching in
/// golang, used by the influx storage rpc.
///
/// See <https://github.com/rust-lang/regex/issues/501> for more details
fn clean_non_meta_escapes(pattern: String) -> String {
if pattern.is_empty() {
return pattern;
}
#[derive(Debug, Copy, Clone)]
enum SlashState {
No,
Single,
Double,
}
let mut next_state = SlashState::No;
let next_chars = pattern
.chars()
.map(Some)
.skip(1)
.chain(std::iter::once(None));
// emit char based on previous
let new_pattern: String = pattern
.chars()
.zip(next_chars)
.filter_map(|(c, next_char)| {
let cur_state = next_state;
next_state = match (c, cur_state) {
('\\', SlashState::No) => SlashState::Single,
('\\', SlashState::Single) => SlashState::Double,
('\\', SlashState::Double) => SlashState::Single,
_ => SlashState::No,
};
// Decide to emit `c` or not
match (cur_state, c, next_char) {
(SlashState::No, '\\', Some(next_char))
| (SlashState::Double, '\\', Some(next_char))
if !is_valid_character_after_escape(next_char) =>
{
None
}
_ => Some(c),
}
})
.collect();
new_pattern
}
#[cfg(test)]
mod test {
use arrow::{
@ -88,6 +166,8 @@ mod test {
};
use std::sync::Arc;
use super::clean_non_meta_escapes;
#[tokio::test]
async fn regex_match_expr() {
let cases = vec![
@ -220,4 +300,37 @@ mod test {
.map(|s| s.to_owned())
.collect())
}
#[test]
fn test_clean_non_meta_escapes() {
let cases = vec![
("", ""),
(r#"\"#, r#"\"#),
(r#"\\"#, r#"\\"#),
// : is not a special meta character
(r#"\:"#, r#":"#),
// . is a special meta character
(r#"\."#, r#"\."#),
(r#"foo\"#, r#"foo\"#),
(r#"foo\\"#, r#"foo\\"#),
(r#"foo\:"#, r#"foo:"#),
(r#"foo\xff"#, r#"foo\xff"#),
(r#"fo\\o"#, r#"fo\\o"#),
(r#"fo\:o"#, r#"fo:o"#),
(r#"fo\:o\x123"#, r#"fo:o\x123"#),
(r#"fo\:o\x123\:"#, r#"fo:o\x123:"#),
(r#"foo\\\:bar"#, r#"foo\\:bar"#),
(r#"foo\\\:bar\\\:"#, r#"foo\\:bar\\:"#),
("foo", "foo"),
];
for (pattern, expected) in cases {
let cleaned_pattern = clean_non_meta_escapes(pattern.to_string());
assert_eq!(
cleaned_pattern, expected,
"Expected '{}' to be cleaned to '{}', got '{}'",
pattern, expected, cleaned_pattern
);
}
}
}

View File

@ -62,10 +62,7 @@ impl DbSetup for TwoMeasurementsMultiSeriesWithDelete {
// 2 rows of h2o with timestamp 200 and 350 will be deleted
let delete_table_name = "h2o";
let pred = DeletePredicate {
range: TimestampRange {
start: 120,
end: 250,
},
range: TimestampRange::new(120, 250),
exprs: vec![],
};
@ -104,10 +101,7 @@ impl DbSetup for TwoMeasurementsMultiSeriesWithDeleteAll {
// pred: delete from h20 where 100 <= time <= 360
let delete_table_name = "h2o";
let pred = DeletePredicate {
range: TimestampRange {
start: 100,
end: 360,
},
range: TimestampRange::new(100, 360),
exprs: vec![],
};
@ -483,7 +477,7 @@ async fn test_read_filter_data_pred_using_regex_match() {
.build();
let expected_results = vec![
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [200], values: [90.0]",
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [200], values: [90.0]",
];
run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await;
@ -514,7 +508,7 @@ async fn test_read_filter_data_pred_using_regex_match_with_delete() {
.build();
let expected_results = vec![
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [350], values: [90.0]",
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [350], values: [90.0]",
];
run_read_filter_test_case(
TwoMeasurementsMultiSeriesWithDelete {},
@ -542,14 +536,59 @@ async fn test_read_filter_data_pred_using_regex_not_match() {
.build();
let expected_results = vec![
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [72.4]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [250], values: [51.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [53.4]",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [72.4]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [250], values: [51.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [53.4]",
];
run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_pred_regex_escape() {
let predicate = PredicateBuilder::default()
// Came from InfluxQL like `SELECT value FROM db0.rp0.status_code WHERE url =~ /https\:\/\/influxdb\.com/`,
.build_regex_match_expr("url", r#"https\://influxdb\.com"#)
.build();
// expect one series with influxdb.com
let expected_results = vec![
"Series tags={_measurement=status_code, url=https://influxdb.com, _field=value}\n FloatPoints timestamps: [1527018816000000000], values: [418.0]",
];
run_read_filter_test_case(MeasurementStatusCode {}, predicate, expected_results).await;
}
pub struct MeasurementStatusCode {}
#[async_trait]
impl DbSetup for MeasurementStatusCode {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "2018-05-22T19";
let lp = vec![
"status_code,url=http://www.example.com value=404 1527018806000000000",
"status_code,url=https://influxdb.com value=418 1527018816000000000",
];
all_scenarios_for_one_chunk(vec![], vec![], lp, "status_code", partition_key).await
}
}
#[tokio::test]
async fn test_read_filter_data_pred_not_match_regex_escape() {
let predicate = PredicateBuilder::default()
// Came from InfluxQL like `SELECT value FROM db0.rp0.status_code WHERE url !~ /https\:\/\/influxdb\.com/`,
.build_regex_not_match_expr("url", r#"https\://influxdb\.com"#)
.build();
// expect one series with influxdb.com
let expected_results = vec![
"Series tags={_measurement=status_code, url=http://www.example.com, _field=value}\n FloatPoints timestamps: [1527018806000000000], values: [404.0]",
];
run_read_filter_test_case(MeasurementStatusCode {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_pred_unsupported_in_scan() {
test_helpers::maybe_start_logging();
@ -649,10 +688,7 @@ impl DbSetup for MeasurementsSortableTagsWithDelete {
// 1 rows of h2o with timestamp 250 will be deleted
let delete_table_name = "h2o";
let pred = DeletePredicate {
range: TimestampRange {
start: 120,
end: 350,
},
range: TimestampRange::new(120, 350),
exprs: vec![DeleteExpr::new(
"state".to_string(),
data_types::delete_predicate::Op::Eq,

View File

@ -86,7 +86,7 @@ impl DbSetup for OneMeasurementNoTagsWithDelete {
// 1 row of m0 with timestamp 1
let delete_table_name = "m0";
let pred = DeletePredicate {
range: TimestampRange { start: 1, end: 1 },
range: TimestampRange::new(1, 1),
exprs: vec![DeleteExpr::new(
"foo".to_string(),
data_types::delete_predicate::Op::Eq,
@ -118,7 +118,7 @@ impl DbSetup for OneMeasurementNoTagsWithDeleteAllWithAndWithoutChunk {
// pred: delete from m0 where 1 <= time <= 2
let delete_table_name = "m0";
let pred = DeletePredicate {
range: TimestampRange { start: 1, end: 2 },
range: TimestampRange::new(1, 2),
exprs: vec![],
};

View File

@ -315,10 +315,7 @@ impl DbSetup for MeasurementForDefect2697WithDelete {
// 1 row of m0 with timestamp 1609459201000000022 (section=2b bar=1.2)
let delete_table_name = "mm";
let pred = DeletePredicate {
range: TimestampRange {
start: 1609459201000000022,
end: 1609459201000000022,
},
range: TimestampRange::new(1609459201000000022, 1609459201000000022),
exprs: vec![],
};
@ -347,10 +344,7 @@ impl DbSetup for MeasurementForDefect2697WithDeleteAll {
// pred: delete from mm where 1 <= time <= 1609459201000000031
let delete_table_name = "mm";
let pred = DeletePredicate {
range: TimestampRange {
start: 1,
end: 1609459201000000031,
},
range: TimestampRange::new(1, 1609459201000000031),
exprs: vec![],
};

View File

@ -279,10 +279,7 @@ impl DbSetup for OneMeasurementManyNullTagsWithDelete {
// 3 rows of h2o & NY state will be deleted
let delete_table_name = "h2o";
let pred = DeletePredicate {
range: TimestampRange {
start: 400,
end: 602,
},
range: TimestampRange::new(400, 602),
exprs: vec![DeleteExpr::new(
"state".to_string(),
data_types::delete_predicate::Op::Eq,
@ -322,10 +319,7 @@ impl DbSetup for OneMeasurementManyNullTagsWithDeleteAll {
// all rows of h2o will be deleted
let delete_table_name = "h2o";
let pred = DeletePredicate {
range: TimestampRange {
start: 100,
end: 602,
},
range: TimestampRange::new(100, 602),
exprs: vec![],
};
@ -412,10 +406,7 @@ impl DbSetup for TwoMeasurementsWithDelete {
// delete 1 row from cpu with timestamp 150
let table_name = "cpu";
let pred = DeletePredicate {
range: TimestampRange {
start: 120,
end: 160,
},
range: TimestampRange::new(120, 160),
exprs: vec![DeleteExpr::new(
"region".to_string(),
data_types::delete_predicate::Op::Eq,
@ -447,10 +438,7 @@ impl DbSetup for TwoMeasurementsWithDeleteAll {
// which will delete second row of the cpu
let table_name = "cpu";
let pred1 = DeletePredicate {
range: TimestampRange {
start: 120,
end: 160,
},
range: TimestampRange::new(120, 160),
exprs: vec![DeleteExpr::new(
"region".to_string(),
data_types::delete_predicate::Op::Eq,
@ -460,7 +448,7 @@ impl DbSetup for TwoMeasurementsWithDeleteAll {
// delete the first row of the cpu
let pred2 = DeletePredicate {
range: TimestampRange { start: 0, end: 110 },
range: TimestampRange::new(0, 110),
exprs: vec![],
};
@ -885,10 +873,7 @@ impl DbSetup for OneMeasurementManyFieldsWithDelete {
// field4 no longer available
let delete_table_name = "h2o";
let pred = DeletePredicate {
range: TimestampRange {
start: 1000,
end: 1100,
},
range: TimestampRange::new(1000, 1100),
exprs: vec![],
};
@ -950,10 +935,7 @@ impl DbSetup for EndToEndTestWithDelete {
// 1 rows of h2o with timestamp 250 will be deleted
let delete_table_name = "swap";
let pred = DeletePredicate {
range: TimestampRange {
start: 6000,
end: 6000,
},
range: TimestampRange::new(6000, 6000),
exprs: vec![DeleteExpr::new(
"name".to_string(),
data_types::delete_predicate::Op::Eq,

View File

@ -31,7 +31,7 @@ impl DbSetup for OneDeleteSimpleExprOneChunkDeleteAll {
// delete predicate
let pred = DeletePredicate {
range: TimestampRange { start: 10, end: 20 },
range: TimestampRange::new(10, 20),
exprs: vec![],
};
@ -54,7 +54,7 @@ impl DbSetup for OneDeleteSimpleExprOneChunk {
// delete predicate
let pred = DeletePredicate {
range: TimestampRange { start: 0, end: 15 },
range: TimestampRange::new(0, 15),
exprs: vec![DeleteExpr::new(
"bar".to_string(),
data_types::delete_predicate::Op::Eq,
@ -106,7 +106,7 @@ impl DbSetup for OneDeleteMultiExprsOneChunk {
];
// delete predicate
let pred = DeletePredicate {
range: TimestampRange { start: 0, end: 30 },
range: TimestampRange::new(0, 30),
exprs: vec![
DeleteExpr::new(
"bar".to_string(),
@ -149,7 +149,7 @@ impl DbSetup for TwoDeletesMultiExprsOneChunk {
// delete predicate
// pred1: delete from cpu where 0 <= time <= 32 and bar = 1 and foo = 'me'
let pred1 = DeletePredicate {
range: TimestampRange { start: 0, end: 32 },
range: TimestampRange::new(0, 32),
exprs: vec![
DeleteExpr::new(
"bar".to_string(),
@ -166,7 +166,7 @@ impl DbSetup for TwoDeletesMultiExprsOneChunk {
// pred2: delete from cpu where 10 <= time <= 40 and bar != 1
let pred2 = DeletePredicate {
range: TimestampRange { start: 10, end: 40 },
range: TimestampRange::new(10, 40),
exprs: vec![DeleteExpr::new(
"bar".to_string(),
data_types::delete_predicate::Op::Ne,
@ -206,7 +206,7 @@ impl DbSetup for ThreeDeleteThreeChunks {
];
// delete predicate on chunk 1
let pred1 = DeletePredicate {
range: TimestampRange { start: 0, end: 30 },
range: TimestampRange::new(0, 30),
exprs: vec![
DeleteExpr::new(
"bar".to_string(),
@ -230,7 +230,7 @@ impl DbSetup for ThreeDeleteThreeChunks {
];
// delete predicate on chunk 1 & chunk 2
let pred2 = DeletePredicate {
range: TimestampRange { start: 20, end: 45 },
range: TimestampRange::new(20, 45),
exprs: vec![DeleteExpr::new(
"foo".to_string(),
data_types::delete_predicate::Op::Eq,
@ -247,7 +247,7 @@ impl DbSetup for ThreeDeleteThreeChunks {
];
// delete predicate on chunk 3
let pred3 = DeletePredicate {
range: TimestampRange { start: 75, end: 95 },
range: TimestampRange::new(75, 95),
exprs: vec![DeleteExpr::new(
"bar".to_string(),
data_types::delete_predicate::Op::Ne,

View File

@ -410,7 +410,7 @@ mod tests {
);
let delete = DmlOperation::Delete(DmlDelete::new(
DeletePredicate {
range: TimestampRange { start: 1, end: 2 },
range: TimestampRange::new(1, 2),
exprs: vec![],
},
Some(NonEmptyString::new("foo_foo").unwrap()),

View File

@ -101,10 +101,7 @@ async fn delete_predicate_preservation() {
// ==================== do: delete ====================
let pred = Arc::new(DeletePredicate {
range: TimestampRange {
start: 0,
end: 1_000,
},
range: TimestampRange::new(0, 1_000),
exprs: vec![DeleteExpr::new(
"selector".to_string(),
data_types::delete_predicate::Op::Eq,

View File

@ -179,7 +179,7 @@ async fn write_buffer_deletes() {
fixture
.delete(DmlDelete::new(
DeletePredicate {
range: TimestampRange { start: 0, end: 20 },
range: TimestampRange::new(0, 20),
exprs: vec![DeleteExpr {
column: "x".to_string(),
op: Op::Eq,

View File

@ -72,6 +72,11 @@ impl Span {
pub fn child(&self, name: impl Into<Cow<'static, str>>) -> Self {
self.ctx.child(name)
}
/// Link this span to another context.
pub fn link(&mut self, other: &SpanContext) {
self.ctx.links.push((other.trace_id, other.span_id));
}
}
#[derive(Debug, Clone)]
@ -126,7 +131,7 @@ pub struct SpanRecorder {
span: Option<Span>,
}
impl<'a> SpanRecorder {
impl SpanRecorder {
pub fn new(mut span: Option<Span>) -> Self {
if let Some(span) = span.as_mut() {
span.start = Some(Utc::now());
@ -177,9 +182,16 @@ impl<'a> SpanRecorder {
pub fn span(&self) -> Option<&Span> {
self.span.as_ref()
}
/// Link this span to another context.
pub fn link(&mut self, other: &SpanContext) {
if let Some(span) = self.span.as_mut() {
span.link(other);
}
}
}
impl<'a> Drop for SpanRecorder {
impl Drop for SpanRecorder {
fn drop(&mut self) {
if let Some(mut span) = self.span.take() {
let now = Utc::now();

View File

@ -20,6 +20,7 @@ parking_lot = "0.11.2"
pin-project = "1.0"
prost = "0.9"
rdkafka = { version = "0.28.0", optional = true }
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="63cf0a541b864d5376b2f96537a5bbb610d0e4bc" }
time = { path = "../time" }
tokio = { version = "1.13", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] }
tokio-util = "0.6.9"

View File

@ -5,6 +5,7 @@ use crate::{
MockBufferForReading, MockBufferForReadingThatAlwaysErrors, MockBufferForWriting,
MockBufferForWritingThatAlwaysErrors, MockBufferSharedState,
},
rskafka::{RSKafkaConsumer, RSKafkaProducer},
};
use data_types::{server_id::ServerId, write_buffer::WriteBufferConnection};
use parking_lot::RwLock;
@ -16,12 +17,6 @@ use std::{
use time::TimeProvider;
use trace::TraceCollector;
#[derive(Debug)]
pub enum WriteBufferConfig {
Writing(Arc<dyn WriteBufferWriting>),
Reading(Arc<tokio::sync::Mutex<Box<dyn WriteBufferReading>>>),
}
#[derive(Debug, Clone)]
enum Mock {
Normal(MockBufferSharedState),
@ -121,6 +116,16 @@ impl WriteBufferConfigFactory {
Arc::new(mock_buffer) as _
}
},
"rskafka" => {
let rskafa_buffer = RSKafkaProducer::new(
cfg.connection.clone(),
db_name.to_owned(),
cfg.creation_config.as_ref(),
Arc::clone(&self.time_provider),
)
.await?;
Arc::new(rskafa_buffer) as _
}
other => {
return Err(format!("Unknown write buffer type: {}", other).into());
}
@ -196,6 +201,16 @@ impl WriteBufferConfigFactory {
Box::new(mock_buffer) as _
}
},
"rskafka" => {
let rskafka_buffer = RSKafkaConsumer::new(
cfg.connection.clone(),
db_name.to_owned(),
cfg.creation_config.as_ref(),
trace_collector.map(Arc::clone),
)
.await?;
Box::new(rskafka_buffer) as _
}
other => {
return Err(format!("Unknown write buffer type: {}", other).into());
}
@ -245,7 +260,10 @@ impl WriteBufferConfigFactory {
#[cfg(test)]
mod tests {
use super::*;
use crate::{core::test_utils::random_topic_name, mock::MockBufferSharedState};
use crate::{
core::test_utils::random_topic_name, maybe_skip_kafka_integration,
mock::MockBufferSharedState,
};
use data_types::{write_buffer::WriteBufferCreationConfig, DatabaseName};
use std::{convert::TryFrom, num::NonZeroU32};
use tempfile::TempDir;
@ -446,10 +464,49 @@ mod tests {
WriteBufferConfigFactory::new(time, registry)
}
#[tokio::test]
async fn test_writing_rskafka() {
let conn = maybe_skip_kafka_integration!();
let factory = factory();
let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
let cfg = WriteBufferConnection {
type_: "rskafka".to_string(),
connection: conn,
creation_config: Some(WriteBufferCreationConfig::default()),
..Default::default()
};
let conn = factory
.new_config_write(db_name.as_str(), &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "rskafka");
}
#[tokio::test]
async fn test_reading_rskafka() {
let conn = maybe_skip_kafka_integration!();
let factory = factory();
let server_id = ServerId::try_from(1).unwrap();
let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
let cfg = WriteBufferConnection {
type_: "rskafka".to_string(),
connection: conn,
creation_config: Some(WriteBufferCreationConfig::default()),
..Default::default()
};
let conn = factory
.new_config_read(server_id, db_name.as_str(), None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "rskafka");
}
#[cfg(feature = "kafka")]
mod kafka {
use super::*;
use crate::maybe_skip_kafka_integration;
#[tokio::test]
async fn test_writing_kafka() {

View File

@ -758,4 +758,51 @@ pub mod test_utils {
{
set.iter().next().cloned().map(|k| set.take(&k)).flatten()
}
/// Get the testing Kafka connection string or return current scope.
///
/// If `TEST_INTEGRATION` and `KAFKA_CONNECT` are set, return the Kafka connection URL to the
/// caller.
///
/// If `TEST_INTEGRATION` is set but `KAFKA_CONNECT` is not set, fail the tests and provide
/// guidance for setting `KAFKA_CONNECTION`.
///
/// If `TEST_INTEGRATION` is not set, skip the calling test by returning early.
#[macro_export]
macro_rules! maybe_skip_kafka_integration {
() => {{
use std::env;
dotenv::dotenv().ok();
match (
env::var("TEST_INTEGRATION").is_ok(),
env::var("KAFKA_CONNECT").ok(),
) {
(true, Some(kafka_connection)) => kafka_connection,
(true, None) => {
panic!(
"TEST_INTEGRATION is set which requires running integration tests, but \
KAFKA_CONNECT is not set. Please run Kafka, perhaps by using the command \
`docker-compose -f docker/ci-kafka-docker-compose.yml up kafka`, then \
set KAFKA_CONNECT to the host and port where Kafka is accessible. If \
running the `docker-compose` command and the Rust tests on the host, the \
value for `KAFKA_CONNECT` should be `localhost:9093`. If running the Rust \
tests in another container in the `docker-compose` network as on CI, \
`KAFKA_CONNECT` should be `kafka:9092`."
)
}
(false, Some(_)) => {
eprintln!("skipping Kafka integration tests - set TEST_INTEGRATION to run");
return;
}
(false, None) => {
eprintln!(
"skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT to \
run"
);
return;
}
}
}};
}
}

View File

@ -703,53 +703,6 @@ pub mod test_utils {
use rdkafka::admin::{AdminOptions, AlterConfig, ResourceSpecifier};
use std::{collections::BTreeMap, time::Duration};
/// Get the testing Kafka connection string or return current scope.
///
/// If `TEST_INTEGRATION` and `KAFKA_CONNECT` are set, return the Kafka connection URL to the
/// caller.
///
/// If `TEST_INTEGRATION` is set but `KAFKA_CONNECT` is not set, fail the tests and provide
/// guidance for setting `KAFKA_CONNECTION`.
///
/// If `TEST_INTEGRATION` is not set, skip the calling test by returning early.
#[macro_export]
macro_rules! maybe_skip_kafka_integration {
() => {{
use std::env;
dotenv::dotenv().ok();
match (
env::var("TEST_INTEGRATION").is_ok(),
env::var("KAFKA_CONNECT").ok(),
) {
(true, Some(kafka_connection)) => kafka_connection,
(true, None) => {
panic!(
"TEST_INTEGRATION is set which requires running integration tests, but \
KAFKA_CONNECT is not set. Please run Kafka, perhaps by using the command \
`docker-compose -f docker/ci-kafka-docker-compose.yml up kafka`, then \
set KAFKA_CONNECT to the host and port where Kafka is accessible. If \
running the `docker-compose` command and the Rust tests on the host, the \
value for `KAFKA_CONNECT` should be `localhost:9093`. If running the Rust \
tests in another container in the `docker-compose` network as on CI, \
`KAFKA_CONNECT` should be `kafka:9092`."
)
}
(false, Some(_)) => {
eprintln!("skipping Kafka integration tests - set TEST_INTEGRATION to run");
return;
}
(false, None) => {
eprintln!(
"skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT to \
run"
);
return;
}
}
}};
}
/// Create topic creation config that is ideal for testing and works with [`purge_kafka_topic`]
pub fn kafka_sequencer_options() -> BTreeMap<String, String> {
BTreeMap::from([

View File

@ -17,3 +17,5 @@ pub mod file;
pub mod kafka;
pub mod mock;
pub mod rskafka;

436
write_buffer/src/rskafka.rs Normal file
View File

@ -0,0 +1,436 @@
use std::{
collections::{BTreeMap, BTreeSet},
sync::{
atomic::{AtomicI64, Ordering},
Arc,
},
};
use async_trait::async_trait;
use data_types::{sequence::Sequence, write_buffer::WriteBufferCreationConfig};
use dml::{DmlMeta, DmlOperation};
use futures::{FutureExt, StreamExt};
use observability_deps::tracing::debug;
use rskafka::{
client::{
consumer::StreamConsumerBuilder,
error::{Error as RSKafkaError, ProtocolError},
partition::PartitionClient,
ClientBuilder,
},
record::Record,
};
use time::{Time, TimeProvider};
use trace::TraceCollector;
use crate::{
codec::{ContentType, IoxHeaders},
core::{
FetchHighWatermark, FetchHighWatermarkFut, WriteBufferError, WriteBufferReading,
WriteBufferWriting, WriteStream,
},
};
type Result<T, E = WriteBufferError> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct RSKafkaProducer {
database_name: String,
time_provider: Arc<dyn TimeProvider>,
// TODO: batched writes
partition_clients: BTreeMap<u32, PartitionClient>,
}
impl RSKafkaProducer {
pub async fn new(
conn: String,
database_name: String,
creation_config: Option<&WriteBufferCreationConfig>,
time_provider: Arc<dyn TimeProvider>,
) -> Result<Self> {
let partition_clients = setup_topic(conn, database_name.clone(), creation_config).await?;
Ok(Self {
database_name,
time_provider,
partition_clients,
})
}
}
#[async_trait]
impl WriteBufferWriting for RSKafkaProducer {
fn sequencer_ids(&self) -> BTreeSet<u32> {
self.partition_clients.keys().copied().collect()
}
async fn store_operation(
&self,
sequencer_id: u32,
operation: &DmlOperation,
) -> Result<DmlMeta, WriteBufferError> {
let partition_client = self
.partition_clients
.get(&sequencer_id)
.ok_or_else::<WriteBufferError, _>(|| {
format!("Unknown partition: {}", sequencer_id).into()
})?;
// truncate milliseconds from timestamps because that's what Kafka supports
let now = operation
.meta()
.producer_ts()
.unwrap_or_else(|| self.time_provider.now());
let timestamp_millis = now.date_time().timestamp_millis();
let timestamp = Time::from_timestamp_millis(timestamp_millis);
let headers = IoxHeaders::new(
ContentType::Protobuf,
operation.meta().span_context().cloned(),
);
let mut buf = Vec::new();
crate::codec::encode_operation(&self.database_name, operation, &mut buf)?;
let buf_len = buf.len();
let record = Record {
key: Default::default(),
value: buf,
headers: headers
.headers()
.map(|(k, v)| (k.to_owned(), v.as_bytes().to_vec()))
.collect(),
timestamp: rskafka::time::OffsetDateTime::from_unix_timestamp_nanos(
timestamp_millis as i128 * 1_000_000,
)?,
};
let kafka_write_size = record.approximate_size();
debug!(db_name=%self.database_name, partition=sequencer_id, size=buf_len, "writing to kafka");
let offsets = partition_client.produce(vec![record]).await?;
let offset = offsets[0];
debug!(db_name=%self.database_name, %offset, partition=sequencer_id, size=buf_len, "wrote to kafka");
Ok(DmlMeta::sequenced(
Sequence::new(sequencer_id, offset.try_into()?),
timestamp,
operation.meta().span_context().cloned(),
kafka_write_size,
))
}
fn type_name(&self) -> &'static str {
"rskafka"
}
}
#[derive(Debug)]
struct ConsumerPartition {
partition_client: Arc<PartitionClient>,
next_offset: Arc<AtomicI64>,
}
#[derive(Debug)]
pub struct RSKafkaConsumer {
partitions: BTreeMap<u32, ConsumerPartition>,
trace_collector: Option<Arc<dyn TraceCollector>>,
}
impl RSKafkaConsumer {
pub async fn new(
conn: String,
database_name: String,
creation_config: Option<&WriteBufferCreationConfig>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<Self> {
let partition_clients = setup_topic(conn, database_name.clone(), creation_config).await?;
let partitions = partition_clients
.into_iter()
.map(|(partition_id, partition_client)| {
let partition_client = Arc::new(partition_client);
let next_offset = Arc::new(AtomicI64::new(0));
(
partition_id,
ConsumerPartition {
partition_client,
next_offset,
},
)
})
.collect();
Ok(Self {
partitions,
trace_collector,
})
}
}
#[async_trait]
impl WriteBufferReading for RSKafkaConsumer {
fn streams(&mut self) -> BTreeMap<u32, WriteStream<'_>> {
let mut streams = BTreeMap::new();
for (sequencer_id, partition) in &self.partitions {
let trace_collector = self.trace_collector.clone();
let next_offset = Arc::clone(&partition.next_offset);
let stream = StreamConsumerBuilder::new(
Arc::clone(&partition.partition_client),
next_offset.load(Ordering::SeqCst),
)
.with_max_wait_ms(100)
.build();
let stream = stream.map(move |res| {
let (record, _watermark) = res?;
let kafka_read_size = record.record.approximate_size();
let headers =
IoxHeaders::from_headers(record.record.headers, trace_collector.as_ref())?;
let sequence = Sequence {
id: *sequencer_id,
number: record.offset.try_into()?,
};
let timestamp_millis =
i64::try_from(record.record.timestamp.unix_timestamp_nanos() / 1_000_000)?;
let timestamp = Time::from_timestamp_millis_opt(timestamp_millis)
.ok_or_else::<WriteBufferError, _>(|| {
format!(
"Cannot parse timestamp for milliseconds: {}",
timestamp_millis
)
.into()
})?;
next_offset.store(record.offset + 1, Ordering::SeqCst);
crate::codec::decode(
&record.record.value,
headers,
sequence,
timestamp,
kafka_read_size,
)
});
let stream = stream.boxed();
let partition_client = Arc::clone(&partition.partition_client);
let fetch_high_watermark = move || {
let partition_client = Arc::clone(&partition_client);
let fut = async move {
let watermark = partition_client.get_high_watermark().await?;
u64::try_from(watermark).map_err(|e| Box::new(e) as WriteBufferError)
};
fut.boxed() as FetchHighWatermarkFut<'_>
};
let fetch_high_watermark = Box::new(fetch_high_watermark) as FetchHighWatermark<'_>;
streams.insert(
*sequencer_id,
WriteStream {
stream,
fetch_high_watermark,
},
);
}
streams
}
async fn seek(
&mut self,
sequencer_id: u32,
sequence_number: u64,
) -> Result<(), WriteBufferError> {
let partition = self
.partitions
.get_mut(&sequencer_id)
.ok_or_else::<WriteBufferError, _>(|| {
format!("Unknown partition: {}", sequencer_id).into()
})?;
let offset = i64::try_from(sequence_number)?;
partition.next_offset.store(offset, Ordering::SeqCst);
Ok(())
}
fn type_name(&self) -> &'static str {
"rskafka"
}
}
async fn setup_topic(
conn: String,
database_name: String,
creation_config: Option<&WriteBufferCreationConfig>,
) -> Result<BTreeMap<u32, PartitionClient>> {
let client = ClientBuilder::new(vec![conn]).build().await?;
let controller_client = client.controller_client().await?;
loop {
// check if topic already exists
let topics = client.list_topics().await?;
if let Some(topic) = topics.into_iter().find(|t| t.name == database_name) {
let mut partition_clients = BTreeMap::new();
for partition in topic.partitions {
let c = client.partition_client(&database_name, partition).await?;
let partition = u32::try_from(partition)?;
partition_clients.insert(partition, c);
}
return Ok(partition_clients);
}
// create topic
if let Some(creation_config) = creation_config {
match controller_client
.create_topic(&database_name, creation_config.n_sequencers.get() as i32, 1)
.await
{
Ok(_) => {}
// race condition between check and creation action, that's OK
Err(RSKafkaError::ServerError(ProtocolError::TopicAlreadyExists, _)) => {}
Err(e) => {
return Err(e.into());
}
}
} else {
return Err("no partitions found and auto-creation not requested"
.to_string()
.into());
}
}
}
#[cfg(test)]
mod tests {
use std::num::NonZeroU32;
use futures::{stream::FuturesUnordered, TryStreamExt};
use trace::RingBufferTraceCollector;
use crate::{
core::test_utils::{perform_generic_tests, random_topic_name, TestAdapter, TestContext},
maybe_skip_kafka_integration,
};
use super::*;
struct RSKafkaTestAdapter {
conn: String,
}
impl RSKafkaTestAdapter {
fn new(conn: String) -> Self {
Self { conn }
}
}
#[async_trait]
impl TestAdapter for RSKafkaTestAdapter {
type Context = RSKafkaTestContext;
async fn new_context_with_time(
&self,
n_sequencers: NonZeroU32,
time_provider: Arc<dyn TimeProvider>,
) -> Self::Context {
RSKafkaTestContext {
conn: self.conn.clone(),
database_name: random_topic_name(),
n_sequencers,
time_provider,
}
}
}
struct RSKafkaTestContext {
conn: String,
database_name: String,
n_sequencers: NonZeroU32,
time_provider: Arc<dyn TimeProvider>,
}
impl RSKafkaTestContext {
fn creation_config(&self, value: bool) -> Option<WriteBufferCreationConfig> {
value.then(|| WriteBufferCreationConfig {
n_sequencers: self.n_sequencers,
..Default::default()
})
}
}
#[async_trait]
impl TestContext for RSKafkaTestContext {
type Writing = RSKafkaProducer;
type Reading = RSKafkaConsumer;
async fn writing(&self, creation_config: bool) -> Result<Self::Writing, WriteBufferError> {
RSKafkaProducer::new(
self.conn.clone(),
self.database_name.clone(),
self.creation_config(creation_config).as_ref(),
Arc::clone(&self.time_provider),
)
.await
}
async fn reading(&self, creation_config: bool) -> Result<Self::Reading, WriteBufferError> {
let collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
RSKafkaConsumer::new(
self.conn.clone(),
self.database_name.clone(),
self.creation_config(creation_config).as_ref(),
Some(collector),
)
.await
}
}
#[tokio::test]
async fn test_generic() {
let conn = maybe_skip_kafka_integration!();
perform_generic_tests(RSKafkaTestAdapter::new(conn)).await;
}
#[tokio::test]
async fn test_setup_topic_race() {
let conn = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();
let n_partitions = NonZeroU32::new(2).unwrap();
let mut jobs: FuturesUnordered<_> = (0..10)
.map(|_| {
let conn = conn.clone();
let topic_name = topic_name.clone();
tokio::spawn(async move {
setup_topic(
conn,
topic_name,
Some(&WriteBufferCreationConfig {
n_sequencers: n_partitions,
..Default::default()
}),
)
.await
.unwrap();
})
})
.collect();
while jobs.try_next().await.unwrap().is_some() {}
}
}