Merge branch 'main' into meta-script

pull/24376/head
kodiakhq[bot] 2021-11-26 11:57:26 +00:00 committed by GitHub
commit 473ed75253
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 57 additions and 150 deletions

View File

@ -1,23 +1,10 @@
use std::{collections::BTreeMap, num::NonZeroU32}; use std::{collections::BTreeMap, num::NonZeroU32};
/// If the buffer is used for reading or writing.
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub enum WriteBufferDirection {
/// Writes into the buffer aka "producer".
Write,
/// Reads from the buffer aka "consumer".
Read,
}
pub const DEFAULT_N_SEQUENCERS: u32 = 1; pub const DEFAULT_N_SEQUENCERS: u32 = 1;
/// Configures the use of a write buffer. /// Configures the use of a write buffer.
#[derive(Debug, Eq, PartialEq, Clone, Hash)] #[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub struct WriteBufferConnection { pub struct WriteBufferConnection {
/// If the buffer is used for reading or writing.
pub direction: WriteBufferDirection,
/// Which type should be used (e.g. "kafka", "mock") /// Which type should be used (e.g. "kafka", "mock")
pub type_: String, pub type_: String,
@ -39,7 +26,6 @@ pub struct WriteBufferConnection {
impl Default for WriteBufferConnection { impl Default for WriteBufferConnection {
fn default() -> Self { fn default() -> Self {
Self { Self {
direction: WriteBufferDirection::Read,
type_: "unspecified".to_string(), type_: "unspecified".to_string(),
connection: Default::default(), connection: Default::default(),
connection_config: Default::default(), connection_config: Default::default(),

View File

@ -7,19 +7,8 @@ import "influxdata/pbdata/v1/influxdb_pb_data_protocol.proto";
// Configures the use of a write buffer. // Configures the use of a write buffer.
message WriteBufferConnection { message WriteBufferConnection {
enum Direction { reserved 1;
// Unspecified direction, will be treated as an error. reserved "direction";
DIRECTION_UNSPECIFIED = 0;
// Writes into the buffer aka "producer".
DIRECTION_WRITE = 1;
// Reads from the buffer aka "consumer".
DIRECTION_READ = 2;
}
// If the buffer is used for reading or writing.
Direction direction = 1;
// Which type should be used (e.g. "kafka", "mock") // Which type should be used (e.g. "kafka", "mock")
string type = 2; string type = 2;

View File

@ -3,15 +3,13 @@ use crate::{
influxdata::iox::write_buffer::v1 as write_buffer, influxdata::iox::write_buffer::v1 as write_buffer,
}; };
use data_types::write_buffer::{ use data_types::write_buffer::{
WriteBufferConnection, WriteBufferCreationConfig, WriteBufferDirection, DEFAULT_N_SEQUENCERS, WriteBufferConnection, WriteBufferCreationConfig, DEFAULT_N_SEQUENCERS,
}; };
use std::{convert::TryFrom, num::NonZeroU32}; use std::{convert::TryFrom, num::NonZeroU32};
impl From<WriteBufferConnection> for write_buffer::WriteBufferConnection { impl From<WriteBufferConnection> for write_buffer::WriteBufferConnection {
fn from(v: WriteBufferConnection) -> Self { fn from(v: WriteBufferConnection) -> Self {
let direction: write_buffer::write_buffer_connection::Direction = v.direction.into();
Self { Self {
direction: direction.into(),
r#type: v.type_, r#type: v.type_,
connection: v.connection, connection: v.connection,
connection_config: v.connection_config.into_iter().collect(), connection_config: v.connection_config.into_iter().collect(),
@ -20,15 +18,6 @@ impl From<WriteBufferConnection> for write_buffer::WriteBufferConnection {
} }
} }
impl From<WriteBufferDirection> for write_buffer::write_buffer_connection::Direction {
fn from(v: WriteBufferDirection) -> Self {
match v {
WriteBufferDirection::Read => Self::Read,
WriteBufferDirection::Write => Self::Write,
}
}
}
impl From<WriteBufferCreationConfig> for write_buffer::WriteBufferCreationConfig { impl From<WriteBufferCreationConfig> for write_buffer::WriteBufferCreationConfig {
fn from(v: WriteBufferCreationConfig) -> Self { fn from(v: WriteBufferCreationConfig) -> Self {
Self { Self {
@ -42,10 +31,7 @@ impl TryFrom<write_buffer::WriteBufferConnection> for WriteBufferConnection {
type Error = FieldViolation; type Error = FieldViolation;
fn try_from(proto: write_buffer::WriteBufferConnection) -> Result<Self, Self::Error> { fn try_from(proto: write_buffer::WriteBufferConnection) -> Result<Self, Self::Error> {
use write_buffer::write_buffer_connection::Direction;
Ok(Self { Ok(Self {
direction: Direction::from_i32(proto.direction).required("direction")?,
type_: proto.r#type, type_: proto.r#type,
connection: proto.connection, connection: proto.connection,
connection_config: proto.connection_config.into_iter().collect(), connection_config: proto.connection_config.into_iter().collect(),
@ -54,22 +40,6 @@ impl TryFrom<write_buffer::WriteBufferConnection> for WriteBufferConnection {
} }
} }
impl TryFrom<write_buffer::write_buffer_connection::Direction> for WriteBufferDirection {
type Error = FieldViolation;
fn try_from(
proto: write_buffer::write_buffer_connection::Direction,
) -> Result<Self, Self::Error> {
use write_buffer::write_buffer_connection::Direction;
match proto {
Direction::Unspecified => Err(FieldViolation::required("")),
Direction::Write => Ok(Self::Write),
Direction::Read => Ok(Self::Read),
}
}
}
impl TryFrom<write_buffer::WriteBufferCreationConfig> for WriteBufferCreationConfig { impl TryFrom<write_buffer::WriteBufferCreationConfig> for WriteBufferCreationConfig {
type Error = FieldViolation; type Error = FieldViolation;

View File

@ -7,7 +7,7 @@ use generated_types::{
}; };
use influxdb_iox_client::{ use influxdb_iox_client::{
management::{Client, CreateDatabaseError}, management::{Client, CreateDatabaseError},
router::generated_types::{write_buffer_connection, WriteBufferConnection}, router::generated_types::WriteBufferConnection,
}; };
use std::{fs::set_permissions, num::NonZeroU32, os::unix::fs::PermissionsExt}; use std::{fs::set_permissions, num::NonZeroU32, os::unix::fs::PermissionsExt};
use test_helpers::assert_contains; use test_helpers::assert_contains;
@ -77,7 +77,6 @@ async fn test_create_database_invalid_kafka() {
let rules = DatabaseRules { let rules = DatabaseRules {
name: "db_with_bad_kafka_address".into(), name: "db_with_bad_kafka_address".into(),
write_buffer_connection: Some(WriteBufferConnection { write_buffer_connection: Some(WriteBufferConnection {
direction: write_buffer_connection::Direction::Read.into(),
r#type: "kafka".into(), r#type: "kafka".into(),
connection: "i_am_not_a_kafka_server:1234".into(), connection: "i_am_not_a_kafka_server:1234".into(),
..Default::default() ..Default::default()

View File

@ -12,7 +12,6 @@ use arrow::{
use data_types::chunk_metadata::{ChunkStorage, ChunkSummary}; use data_types::chunk_metadata::{ChunkStorage, ChunkSummary};
use futures::prelude::*; use futures::prelude::*;
use influxdb_iox_client::management::generated_types::partition_template; use influxdb_iox_client::management::generated_types::partition_template;
use influxdb_iox_client::management::generated_types::write_buffer_connection;
use influxdb_iox_client::management::generated_types::WriteBufferConnection; use influxdb_iox_client::management::generated_types::WriteBufferConnection;
use influxdb_iox_client::management::CreateDatabaseError; use influxdb_iox_client::management::CreateDatabaseError;
use prost::Message; use prost::Message;
@ -605,7 +604,6 @@ pub async fn fixture_replay_broken(db_name: &str, write_buffer_path: &Path) -> S
.create_database(DatabaseRules { .create_database(DatabaseRules {
name: db_name.to_string(), name: db_name.to_string(),
write_buffer_connection: Some(WriteBufferConnection { write_buffer_connection: Some(WriteBufferConnection {
direction: write_buffer_connection::Direction::Read.into(),
r#type: "file".to_string(), r#type: "file".to_string(),
connection: write_buffer_path.display().to_string(), connection: write_buffer_path.display().to_string(),
creation_config: Some(WriteBufferCreationConfig { creation_config: Some(WriteBufferCreationConfig {
@ -721,7 +719,6 @@ pub fn wildcard_router_config(
}; };
let write_buffer_connection = WriteBufferConnection { let write_buffer_connection = WriteBufferConnection {
direction: write_buffer_connection::Direction::Write.into(),
r#type: "file".to_string(), r#type: "file".to_string(),
connection: write_buffer_path.display().to_string(), connection: write_buffer_path.display().to_string(),
creation_config: Some(WriteBufferCreationConfig { creation_config: Some(WriteBufferCreationConfig {

View File

@ -6,9 +6,7 @@ use crate::{
end_to_end_cases::scenario::{rand_name, wildcard_router_config, DatabaseBuilder}, end_to_end_cases::scenario::{rand_name, wildcard_router_config, DatabaseBuilder},
}; };
use arrow_util::assert_batches_sorted_eq; use arrow_util::assert_batches_sorted_eq;
use generated_types::influxdata::iox::write_buffer::v1::{ use generated_types::influxdata::iox::write_buffer::v1::WriteBufferConnection;
write_buffer_connection::Direction as WriteBufferDirection, WriteBufferConnection,
};
use influxdb_iox_client::{ use influxdb_iox_client::{
delete::{ delete::{
generated_types::{Predicate, TimestampRange}, generated_types::{Predicate, TimestampRange},
@ -31,7 +29,6 @@ async fn reads_come_from_write_buffer() {
let server = ServerFixture::create_shared(ServerType::Database).await; let server = ServerFixture::create_shared(ServerType::Database).await;
let db_name = rand_name(); let db_name = rand_name();
let write_buffer_connection = WriteBufferConnection { let write_buffer_connection = WriteBufferConnection {
direction: WriteBufferDirection::Read.into(),
r#type: "file".to_string(), r#type: "file".to_string(),
connection: write_buffer_dir.path().display().to_string(), connection: write_buffer_dir.path().display().to_string(),
creation_config: Some(WriteBufferCreationConfig { creation_config: Some(WriteBufferCreationConfig {
@ -133,7 +130,6 @@ async fn cant_write_to_db_reading_from_write_buffer() {
let server = ServerFixture::create_shared(ServerType::Database).await; let server = ServerFixture::create_shared(ServerType::Database).await;
let db_name = rand_name(); let db_name = rand_name();
let write_buffer_connection = WriteBufferConnection { let write_buffer_connection = WriteBufferConnection {
direction: WriteBufferDirection::Read.into(),
r#type: "file".to_string(), r#type: "file".to_string(),
connection: write_buffer_dir.path().display().to_string(), connection: write_buffer_dir.path().display().to_string(),
creation_config: Some(WriteBufferCreationConfig { creation_config: Some(WriteBufferCreationConfig {
@ -184,7 +180,6 @@ async fn test_create_database_missing_write_buffer_sequencers() {
let server = ServerFixture::create_shared(ServerType::Database).await; let server = ServerFixture::create_shared(ServerType::Database).await;
let db_name = rand_name(); let db_name = rand_name();
let write_buffer_connection = WriteBufferConnection { let write_buffer_connection = WriteBufferConnection {
direction: WriteBufferDirection::Read.into(),
r#type: "file".to_string(), r#type: "file".to_string(),
connection: write_buffer_dir.path().display().to_string(), connection: write_buffer_dir.path().display().to_string(),
..Default::default() ..Default::default()
@ -243,7 +238,6 @@ pub async fn test_cross_write_buffer_tracing() {
.unwrap(); .unwrap();
server_read.wait_server_initialized().await; server_read.wait_server_initialized().await;
let conn_read = WriteBufferConnection { let conn_read = WriteBufferConnection {
direction: WriteBufferDirection::Read.into(),
r#type: "file".to_string(), r#type: "file".to_string(),
connection: write_buffer_dir.path().display().to_string(), connection: write_buffer_dir.path().display().to_string(),
creation_config: Some(WriteBufferCreationConfig { creation_config: Some(WriteBufferCreationConfig {

View File

@ -47,7 +47,7 @@ For the Kafka setup, you'll need to start two IOx servers, so you'll need to set
for at least one of them. Here's an example of the two commands to run: for at least one of them. Here's an example of the two commands to run:
``` ```
cargo run --release -- run database --server-id 1 cargo run --release -- run router --server-id 1
cargo run --release -- run database --server-id 2 --api-bind 127.0.0.1:8084 --grpc-bind 127.0.0.1:8086 cargo run --release -- run database --server-id 2 --api-bind 127.0.0.1:8084 --grpc-bind 127.0.0.1:8086
``` ```

View File

@ -10,9 +10,20 @@
clippy::clone_on_ref_ptr clippy::clone_on_ref_ptr
)] )]
use std::collections::HashMap;
use clap::{App, Arg}; use clap::{App, Arg};
use influxdb_iox_client::management::generated_types::*; use influxdb_iox_client::{
use influxdb_iox_client::write::generated_types::*; management::generated_types::{
database_rules, lifecycle_rules, partition_template, sink, DatabaseRules, KafkaProducer,
LifecycleRules, PartitionTemplate, RoutingConfig, Sink,
},
router::generated_types::{
write_sink, Matcher, MatcherToShard, Router, ShardConfig, WriteBufferConnection, WriteSink,
WriteSinkSet,
},
write::generated_types::{column, Column, DatabaseBatch, TableBatch, WriteRequest},
};
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
@ -69,33 +80,33 @@ Examples:
.expect("KAFKA has a default value"); .expect("KAFKA has a default value");
// Edit these to whatever DatabaseRules you want to use // Edit these to whatever DatabaseRules you want to use
let writer_database_rules = DatabaseRules { let router_config = Router {
name: db_name.clone(), name: db_name.clone(),
partition_template: Some(PartitionTemplate { write_sharder: Some(ShardConfig {
parts: vec![partition_template::Part { specific_targets: vec![MatcherToShard {
part: Some(partition_template::part::Part::Time( matcher: Some(Matcher {
"%Y-%m-%d %H:00:00".into(), table_name_regex: String::from(".*"),
)), }),
shard: 1,
}], }],
hash_ring: None,
}), }),
lifecycle_rules: Some(LifecycleRules { write_sinks: HashMap::from([(
immutable: true, 1,
..Default::default() WriteSinkSet {
}), sinks: vec![WriteSink {
worker_cleanup_avg_sleep: None, sink: Some(write_sink::Sink::WriteBuffer(WriteBufferConnection {
routing_rules: Some(database_rules::RoutingRules::RoutingConfig(RoutingConfig {
sink: Some(Sink {
sink: Some(sink::Sink::Kafka(KafkaProducer {})),
}),
})),
write_buffer_connection: Some(WriteBufferConnection {
direction: write_buffer_connection::Direction::Write.into(),
r#type: "kafka".to_string(), r#type: "kafka".to_string(),
connection: kafka.to_string(), connection: kafka.to_string(),
..Default::default() ..Default::default()
}), })),
ignore_errors: false,
}],
},
)]),
query_sinks: None,
}; };
let reader_database_rules = DatabaseRules { let database_rules = DatabaseRules {
name: db_name.clone(), name: db_name.clone(),
partition_template: Some(PartitionTemplate { partition_template: Some(PartitionTemplate {
parts: vec![partition_template::Part { parts: vec![partition_template::Part {
@ -122,7 +133,6 @@ Examples:
}), }),
})), })),
write_buffer_connection: Some(WriteBufferConnection { write_buffer_connection: Some(WriteBufferConnection {
direction: write_buffer_connection::Direction::Read.into(),
r#type: "kafka".to_string(), r#type: "kafka".to_string(),
connection: kafka.to_string(), connection: kafka.to_string(),
..Default::default() ..Default::default()
@ -135,12 +145,12 @@ Examples:
.build(writer_grpc_bind_addr) .build(writer_grpc_bind_addr)
.await .await
.unwrap(); .unwrap();
let mut writer_management_client = let mut writer_router_client =
influxdb_iox_client::management::Client::new(writer_grpc_channel.clone()); influxdb_iox_client::router::Client::new(writer_grpc_channel.clone());
writer_management_client writer_router_client
.create_database(writer_database_rules) .update_router(router_config)
.await .await
.expect("create writer database failed"); .expect("create router failed");
// Write a few points // Write a few points
let mut write_client = influxdb_iox_client::write::Client::new(writer_grpc_channel); let mut write_client = influxdb_iox_client::write::Client::new(writer_grpc_channel);
@ -158,7 +168,7 @@ Examples:
let mut reader_management_client = let mut reader_management_client =
influxdb_iox_client::management::Client::new(reader_grpc_channel.clone()); influxdb_iox_client::management::Client::new(reader_grpc_channel.clone());
reader_management_client reader_management_client
.create_database(reader_database_rules) .create_database(database_rules)
.await .await
.expect("create reader database failed"); .expect("create reader database failed");

View File

@ -524,7 +524,6 @@ def grpc_create_database(router_id, writer_id):
'sinks': [ 'sinks': [
{ {
'write_buffer': { 'write_buffer': {
'direction': 'DIRECTION_WRITE',
'type': 'kafka', 'type': 'kafka',
'connection': '127.0.0.1:9093', 'connection': '127.0.0.1:9093',
'connection_config': {}, 'connection_config': {},
@ -565,7 +564,6 @@ def grpc_create_database(router_id, writer_id):
'routing_config': {'sink': {'kafka': {}}}, 'routing_config': {'sink': {'kafka': {}}},
'worker_cleanup_avg_sleep': '500s', 'worker_cleanup_avg_sleep': '500s',
'write_buffer_connection': { 'write_buffer_connection': {
'direction': 'DIRECTION_READ',
'type': 'kafka', 'type': 'kafka',
'connection': '127.0.0.1:9093', 'connection': '127.0.0.1:9093',
'connection_config': {}, 'connection_config': {},

View File

@ -204,7 +204,6 @@ impl WriteSinkSet {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use data_types::write_buffer::WriteBufferDirection;
use dml::DmlWrite; use dml::DmlWrite;
use mutable_batch_lp::lines_to_batches; use mutable_batch_lp::lines_to_batches;
use time::SystemProvider; use time::SystemProvider;
@ -267,7 +266,6 @@ mod tests {
// write buffer, do NOT ignore errors // write buffer, do NOT ignore errors
let write_buffer_cfg = WriteBufferConnection { let write_buffer_cfg = WriteBufferConnection {
direction: WriteBufferDirection::Write,
type_: String::from("mock"), type_: String::from("mock"),
connection: String::from("failing_wb"), connection: String::from("failing_wb"),
..Default::default() ..Default::default()

View File

@ -8,7 +8,7 @@ use crate::{
rules::{PersistedDatabaseRules, ProvidedDatabaseRules}, rules::{PersistedDatabaseRules, ProvidedDatabaseRules},
ApplicationState, Db, ApplicationState, Db,
}; };
use data_types::{server_id::ServerId, write_buffer::WriteBufferDirection, DatabaseName}; use data_types::{server_id::ServerId, DatabaseName};
use dml::DmlOperation; use dml::DmlOperation;
use futures::{ use futures::{
future::{BoxFuture, FusedFuture, Shared}, future::{BoxFuture, FusedFuture, Shared},
@ -1388,7 +1388,7 @@ impl DatabaseStateCatalogLoaded {
let trace_collector = shared.application.trace_collector(); let trace_collector = shared.application.trace_collector();
let write_buffer_factory = shared.application.write_buffer_factory(); let write_buffer_factory = shared.application.write_buffer_factory();
let write_buffer_consumer = match rules.write_buffer_connection.as_ref() { let write_buffer_consumer = match rules.write_buffer_connection.as_ref() {
Some(connection) if matches!(connection.direction, WriteBufferDirection::Read) => { Some(connection) => {
let mut consumer = write_buffer_factory let mut consumer = write_buffer_factory
.new_config_read( .new_config_read(
shared.config.server_id, shared.config.server_id,
@ -1452,7 +1452,7 @@ mod tests {
use data_types::{ use data_types::{
database_rules::{PartitionTemplate, TemplatePart}, database_rules::{PartitionTemplate, TemplatePart},
sequence::Sequence, sequence::Sequence,
write_buffer::{WriteBufferConnection, WriteBufferDirection}, write_buffer::WriteBufferConnection,
}; };
use std::{num::NonZeroU32, time::Instant}; use std::{num::NonZeroU32, time::Instant};
use uuid::Uuid; use uuid::Uuid;
@ -1697,7 +1697,6 @@ mod tests {
routing_rules: None, routing_rules: None,
worker_cleanup_avg_sleep: Duration::from_secs(2), worker_cleanup_avg_sleep: Duration::from_secs(2),
write_buffer_connection: Some(WriteBufferConnection { write_buffer_connection: Some(WriteBufferConnection {
direction: WriteBufferDirection::Read,
type_: "mock".to_string(), type_: "mock".to_string(),
connection: "my_mock".to_string(), connection: "my_mock".to_string(),
..Default::default() ..Default::default()

View File

@ -1215,7 +1215,7 @@ mod tests {
use data_types::{ use data_types::{
chunk_metadata::{ChunkAddr, ChunkStorage}, chunk_metadata::{ChunkAddr, ChunkStorage},
database_rules::{DatabaseRules, LifecycleRules, PartitionTemplate, TemplatePart}, database_rules::{DatabaseRules, LifecycleRules, PartitionTemplate, TemplatePart},
write_buffer::{WriteBufferConnection, WriteBufferDirection}, write_buffer::WriteBufferConnection,
}; };
use dml::DmlWrite; use dml::DmlWrite;
use iox_object_store::IoxObjectStore; use iox_object_store::IoxObjectStore;
@ -2005,7 +2005,6 @@ mod tests {
routing_rules: None, routing_rules: None,
worker_cleanup_avg_sleep: Duration::from_secs(2), worker_cleanup_avg_sleep: Duration::from_secs(2),
write_buffer_connection: Some(WriteBufferConnection { write_buffer_connection: Some(WriteBufferConnection {
direction: WriteBufferDirection::Write,
type_: "mock".to_string(), type_: "mock".to_string(),
connection: "my_mock".to_string(), connection: "my_mock".to_string(),
..Default::default() ..Default::default()

View File

@ -3,7 +3,7 @@ use std::time::{Duration, Instant};
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use data_types::database_rules::{DatabaseRules, LifecycleRules, PartitionTemplate, TemplatePart}; use data_types::database_rules::{DatabaseRules, LifecycleRules, PartitionTemplate, TemplatePart};
use data_types::write_buffer::{WriteBufferConnection, WriteBufferDirection}; use data_types::write_buffer::WriteBufferConnection;
use data_types::{sequence::Sequence, server_id::ServerId, DatabaseName}; use data_types::{sequence::Sequence, server_id::ServerId, DatabaseName};
use query::QueryDatabase; use query::QueryDatabase;
use server::{ use server::{
@ -61,7 +61,6 @@ async fn write_buffer_reads_wait_for_compaction() {
..Default::default() ..Default::default()
}, },
write_buffer_connection: Some(WriteBufferConnection { write_buffer_connection: Some(WriteBufferConnection {
direction: WriteBufferDirection::Read,
type_: "mock".to_string(), type_: "mock".to_string(),
connection: "my_mock".to_string(), connection: "my_mock".to_string(),
..Default::default() ..Default::default()

View File

@ -14,8 +14,7 @@ use data_types::timestamp::TimestampRange;
use data_types::DatabaseName; use data_types::DatabaseName;
use dml::{DmlDelete, DmlOperation, DmlWrite}; use dml::{DmlDelete, DmlOperation, DmlWrite};
use generated_types::influxdata::iox::{ use generated_types::influxdata::iox::{
management::v1::DatabaseRules, management::v1::DatabaseRules, write_buffer::v1::WriteBufferConnection,
write_buffer::v1::{write_buffer_connection::Direction, WriteBufferConnection},
}; };
use mutable_batch_lp::lines_to_batches; use mutable_batch_lp::lines_to_batches;
use query::exec::ExecutionContextProvider; use query::exec::ExecutionContextProvider;
@ -58,8 +57,7 @@ impl DistributedTest {
.write_buffer_factory() .write_buffer_factory()
.register_mock("my_mock".to_string(), write_buffer_state); .register_mock("my_mock".to_string(), write_buffer_state);
let mut write_buffer_connection = WriteBufferConnection { let write_buffer_connection = WriteBufferConnection {
direction: Direction::Write as _,
r#type: "mock".to_string(), r#type: "mock".to_string(),
connection: "my_mock".to_string(), connection: "my_mock".to_string(),
connection_config: Default::default(), connection_config: Default::default(),
@ -107,8 +105,6 @@ impl DistributedTest {
let consumer_id = ServerId::new(NonZeroU32::new(2).unwrap()); let consumer_id = ServerId::new(NonZeroU32::new(2).unwrap());
let consumer = make_initialized_server(consumer_id, Arc::clone(&application)).await; let consumer = make_initialized_server(consumer_id, Arc::clone(&application)).await;
write_buffer_connection.direction = Direction::Read as _;
let consumer_db = consumer let consumer_db = consumer
.create_database( .create_database(
ProvidedDatabaseRules::new_rules(DatabaseRules { ProvidedDatabaseRules::new_rules(DatabaseRules {

View File

@ -21,7 +21,6 @@
"routing_config": {"sink": {"kafka": {}}}, "routing_config": {"sink": {"kafka": {}}},
"worker_cleanup_avg_sleep": "500s", "worker_cleanup_avg_sleep": "500s",
"write_buffer_connection": { "write_buffer_connection": {
"direction": "DIRECTION_READ",
"type": "kafka", "type": "kafka",
"connection": "redpanda-service:9093", "connection": "redpanda-service:9093",
"connection_config": {}, "connection_config": {},

View File

@ -17,7 +17,6 @@
"sinks": [ "sinks": [
{ {
"write_buffer": { "write_buffer": {
"direction": "DIRECTION_WRITE",
"type": "kafka", "type": "kafka",
"connection": "redpanda-service:9093", "connection": "redpanda-service:9093",
"connection_config": {}, "connection_config": {},

View File

@ -5,10 +5,7 @@ use std::{
sync::Arc, sync::Arc,
}; };
use data_types::{ use data_types::{server_id::ServerId, write_buffer::WriteBufferConnection};
server_id::ServerId,
write_buffer::{WriteBufferConnection, WriteBufferDirection},
};
use time::TimeProvider; use time::TimeProvider;
use trace::TraceCollector; use trace::TraceCollector;
@ -94,16 +91,11 @@ impl WriteBufferConfigFactory {
/// Returns a new [`WriteBufferWriting`] for the provided [`WriteBufferConnection`] /// Returns a new [`WriteBufferWriting`] for the provided [`WriteBufferConnection`]
/// ///
/// # Panics
/// When the provided connection is not [`WriteBufferDirection::Write`]
///
pub async fn new_config_write( pub async fn new_config_write(
&self, &self,
db_name: &str, db_name: &str,
cfg: &WriteBufferConnection, cfg: &WriteBufferConnection,
) -> Result<Arc<dyn WriteBufferWriting>, WriteBufferError> { ) -> Result<Arc<dyn WriteBufferWriting>, WriteBufferError> {
assert_eq!(cfg.direction, WriteBufferDirection::Write);
let writer = match &cfg.type_[..] { let writer = match &cfg.type_[..] {
"file" => { "file" => {
let root = PathBuf::from(&cfg.connection); let root = PathBuf::from(&cfg.connection);
@ -151,9 +143,6 @@ impl WriteBufferConfigFactory {
} }
/// Returns a new [`WriteBufferReading`] for the provided [`WriteBufferConnection`] /// Returns a new [`WriteBufferReading`] for the provided [`WriteBufferConnection`]
///
/// # Panics
/// When the provided connection is not [`WriteBufferDirection::Read`]
pub async fn new_config_read( pub async fn new_config_read(
&self, &self,
server_id: ServerId, server_id: ServerId,
@ -161,8 +150,6 @@ impl WriteBufferConfigFactory {
trace_collector: Option<&Arc<dyn TraceCollector>>, trace_collector: Option<&Arc<dyn TraceCollector>>,
cfg: &WriteBufferConnection, cfg: &WriteBufferConnection,
) -> Result<Box<dyn WriteBufferReading>, WriteBufferError> { ) -> Result<Box<dyn WriteBufferReading>, WriteBufferError> {
assert_eq!(cfg.direction, WriteBufferDirection::Read);
let reader = match &cfg.type_[..] { let reader = match &cfg.type_[..] {
"file" => { "file" => {
let root = PathBuf::from(&cfg.connection); let root = PathBuf::from(&cfg.connection);
@ -228,7 +215,6 @@ mod tests {
let factory = factory(); let factory = factory();
let db_name = DatabaseName::try_from("foo").unwrap(); let db_name = DatabaseName::try_from("foo").unwrap();
let cfg = WriteBufferConnection { let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Write,
type_: "file".to_string(), type_: "file".to_string(),
connection: root.path().display().to_string(), connection: root.path().display().to_string(),
creation_config: Some(WriteBufferCreationConfig::default()), creation_config: Some(WriteBufferCreationConfig::default()),
@ -248,7 +234,6 @@ mod tests {
let factory = factory(); let factory = factory();
let db_name = DatabaseName::try_from("foo").unwrap(); let db_name = DatabaseName::try_from("foo").unwrap();
let cfg = WriteBufferConnection { let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Read,
type_: "file".to_string(), type_: "file".to_string(),
connection: root.path().display().to_string(), connection: root.path().display().to_string(),
creation_config: Some(WriteBufferCreationConfig::default()), creation_config: Some(WriteBufferCreationConfig::default()),
@ -269,7 +254,6 @@ mod tests {
let factory = factory(); let factory = factory();
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
let cfg = WriteBufferConnection { let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Write,
type_: "kafka".to_string(), type_: "kafka".to_string(),
connection: conn, connection: conn,
creation_config: Some(WriteBufferCreationConfig::default()), creation_config: Some(WriteBufferCreationConfig::default()),
@ -291,7 +275,6 @@ mod tests {
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
let cfg = WriteBufferConnection { let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Read,
type_: "kafka".to_string(), type_: "kafka".to_string(),
connection: conn, connection: conn,
creation_config: Some(WriteBufferCreationConfig::default()), creation_config: Some(WriteBufferCreationConfig::default()),
@ -316,7 +299,6 @@ mod tests {
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
let cfg = WriteBufferConnection { let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Write,
type_: "mock".to_string(), type_: "mock".to_string(),
connection: mock_name.to_string(), connection: mock_name.to_string(),
..Default::default() ..Default::default()
@ -330,7 +312,6 @@ mod tests {
// will error when state is unknown // will error when state is unknown
let cfg = WriteBufferConnection { let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Write,
type_: "mock".to_string(), type_: "mock".to_string(),
connection: "bar".to_string(), connection: "bar".to_string(),
..Default::default() ..Default::default()
@ -354,7 +335,6 @@ mod tests {
let server_id = ServerId::try_from(1).unwrap(); let server_id = ServerId::try_from(1).unwrap();
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
let cfg = WriteBufferConnection { let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Read,
type_: "mock".to_string(), type_: "mock".to_string(),
connection: mock_name.to_string(), connection: mock_name.to_string(),
..Default::default() ..Default::default()
@ -368,7 +348,6 @@ mod tests {
// will error when state is unknown // will error when state is unknown
let cfg = WriteBufferConnection { let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Read,
type_: "mock".to_string(), type_: "mock".to_string(),
connection: "bar".to_string(), connection: "bar".to_string(),
..Default::default() ..Default::default()
@ -389,7 +368,6 @@ mod tests {
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
let cfg = WriteBufferConnection { let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Write,
type_: "mock".to_string(), type_: "mock".to_string(),
connection: mock_name.to_string(), connection: mock_name.to_string(),
..Default::default() ..Default::default()
@ -403,7 +381,6 @@ mod tests {
// will error when state is unknown // will error when state is unknown
let cfg = WriteBufferConnection { let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Write,
type_: "mock".to_string(), type_: "mock".to_string(),
connection: "bar".to_string(), connection: "bar".to_string(),
..Default::default() ..Default::default()
@ -426,7 +403,6 @@ mod tests {
let db_name = DatabaseName::new("foo").unwrap(); let db_name = DatabaseName::new("foo").unwrap();
let cfg = WriteBufferConnection { let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Read,
type_: "mock".to_string(), type_: "mock".to_string(),
connection: mock_name.to_string(), connection: mock_name.to_string(),
..Default::default() ..Default::default()
@ -440,7 +416,6 @@ mod tests {
// will error when state is unknown // will error when state is unknown
let cfg = WriteBufferConnection { let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Read,
type_: "mock".to_string(), type_: "mock".to_string(),
connection: "bar".to_string(), connection: "bar".to_string(),
..Default::default() ..Default::default()