feat: Set different write buffer config defaults for all-in-one mode

Connects to #4399.

Only file-based write buffer is supported. If `--data-dir` is specified,
store it there, otherwise store it in a temp directory to be ephemeral
pull/24376/head
Carol (Nichols || Goulding) 2022-04-25 17:14:11 -04:00
parent 0cfd16263c
commit 941dd12dd1
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
3 changed files with 43 additions and 43 deletions

View File

@ -3,26 +3,20 @@ name = "clap_blocks"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
clap = { version = "3", features = ["derive", "env"] }
data_types = { path = "../data_types" }
futures = "0.3"
iox_catalog = { path = "../iox_catalog" }
iox_time = { path = "../iox_time" }
metric = { path = "../metric" }
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
trace_exporters = { path = "../trace_exporters" }
trace = { path = "../trace" }
iox_time = { path = "../iox_time" }
trogging = { path = "../trogging", default-features = false, features = ["clap"] }
write_buffer = { path = "../write_buffer" }
clap = { version = "3", features = ["derive", "env"] }
futures = "0.3"
snafu = "0.7"
tempfile = "3.1.0"
trace = { path = "../trace" }
trace_exporters = { path = "../trace_exporters" }
trogging = { path = "../trogging", default-features = false, features = ["clap"] }
uuid = { version = "0.8", features = ["v4"] }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
tempfile = "3.1.0"
write_buffer = { path = "../write_buffer" }

View File

@ -1,6 +1,7 @@
use data_types::write_buffer::{WriteBufferConnection, WriteBufferCreationConfig};
use iox_time::SystemProvider;
use std::{collections::BTreeMap, num::NonZeroU32, sync::Arc};
use std::{collections::BTreeMap, num::NonZeroU32, path::PathBuf, sync::Arc};
use tempfile::TempDir;
use trace::TraceCollector;
use write_buffer::{
config::WriteBufferConfigFactory,
@ -53,6 +54,27 @@ pub struct WriteBufferConfig {
}
impl WriteBufferConfig {
/// Create a new instance for all-in-one mode, only allowing some arguments.
pub fn new(topic: &str, database_directory: Option<PathBuf>) -> Self {
let connection_string = database_directory
.map(|pathbuf| pathbuf.display().to_string())
.unwrap_or_else(|| {
TempDir::new()
.expect("Creating a temporary directory should work")
.into_path()
.display()
.to_string()
});
Self {
type_: "file".to_string(),
connection_string,
topic: topic.to_string(),
connection_config: Default::default(),
auto_create_topics: Some(NonZeroU32::new(1).unwrap()),
}
}
/// Initialize a [`WriteBufferWriting`].
pub async fn writing(
&self,

View File

@ -18,7 +18,7 @@ use ioxd_router2::create_router2_server_type;
use object_store::{DynObjectStore, ObjectStoreImpl};
use observability_deps::tracing::*;
use query::exec::Executor;
use std::{num::NonZeroU32, path::PathBuf, sync::Arc};
use std::{path::PathBuf, sync::Arc};
use thiserror::Error;
use trace_exporters::TracingConfig;
use trogging::cli::LoggingConfig;
@ -38,6 +38,9 @@ pub const DEFAULT_INGESTER_GRPC_BIND_ADDR: &str = "127.0.0.1:8083";
/// The default bind address for the Compactor gRPC (chosen to match default gRPC addr)
pub const DEFAULT_COMPACTOR_GRPC_BIND_ADDR: &str = "127.0.0.1:8084";
// If you want this level of control, should be instatiating the services individually
const QUERY_POOL_NAME: &str = "iox-shared";
#[derive(Debug, Error)]
pub enum Error {
#[error("Run: {0}")]
@ -161,9 +164,6 @@ pub struct Config {
#[clap(flatten)]
pub(crate) catalog_dsn: CatalogDsnConfig,
#[clap(flatten)]
pub(crate) write_buffer_config: WriteBufferConfig,
/// The ingester will continue to pull data and buffer it from the write buffer
/// as long as it is below this size. If it hits this size it will pause
/// ingest from the write buffer until persistence goes below this threshold.
@ -266,7 +266,6 @@ impl Config {
max_http_request_size,
database_directory,
catalog_dsn,
write_buffer_config,
pause_ingest_size_bytes,
persist_memory_threshold_bytes,
persist_partition_size_threshold_bytes,
@ -279,7 +278,8 @@ impl Config {
compactor_grpc_bind_address,
} = self;
let object_store_config = ObjectStoreConfig::new(database_directory);
let object_store_config = ObjectStoreConfig::new(database_directory.clone());
let write_buffer_config = WriteBufferConfig::new(QUERY_POOL_NAME, database_directory);
let router_run_config = RunConfig::new(
logging_config,
@ -317,10 +317,10 @@ impl Config {
};
// create a CompactorConfig for the all in one server based on
// settings from other configs. Cant use `#clap(flatten)` as the
// parameters are redundant with ingesters
// settings from other configs. Can't use `#clap(flatten)` as the
// parameters are redundant with ingester's
let compactor_config = CompactorConfig {
topic: write_buffer_config.topic().to_string(),
topic: QUERY_POOL_NAME.to_string(),
write_buffer_partition_range_start,
write_buffer_partition_range_end,
split_percentage: 90,
@ -364,27 +364,11 @@ pub async fn command(config: Config) -> Result<()> {
ingester_run_config,
compactor_run_config,
catalog_dsn,
mut write_buffer_config,
write_buffer_config,
ingester_config,
compactor_config,
} = config.specialize();
// Ensure at least one topic is automatically created in all in one mode
write_buffer_config.set_auto_create_topics(Some(
write_buffer_config.auto_create_topics().unwrap_or_else(|| {
let default_config = NonZeroU32::new(1).unwrap();
info!(
?default_config,
"Automatically configuring creation of a single topic"
);
default_config
}),
));
// If you want this level of control, should be instatiating the
// services individually
let query_pool_name = "iox-shared";
let metrics = Arc::new(metric::Registry::default());
let catalog = catalog_dsn
@ -401,7 +385,7 @@ pub async fn command(config: Config) -> Result<()> {
.repositories()
.await
.kafka_topics()
.create_or_get(query_pool_name)
.create_or_get(QUERY_POOL_NAME)
.await?;
let object_store: Arc<DynObjectStore> = Arc::new(
@ -427,7 +411,7 @@ pub async fn command(config: Config) -> Result<()> {
Arc::clone(&catalog),
Arc::clone(&object_store),
&write_buffer_config,
query_pool_name,
QUERY_POOL_NAME,
1_000, // max 1,000 concurrent HTTP requests
)
.await?;