Merge remote-tracking branch 'origin/main' into dom/dml-delete-namespace-id
commit
09e9b69b85
|
@ -1129,12 +1129,10 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion"
|
||||
version = "13.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=2b08a43b82127ef144204e5999dd2730fa1c4756#2b08a43b82127ef144204e5999dd2730fa1c4756"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=761e1671bd3c4988d21a38bb19e50bdac6cfaa61#761e1671bd3c4988d21a38bb19e50bdac6cfaa61"
|
||||
dependencies = [
|
||||
"ahash 0.8.0",
|
||||
"arrow",
|
||||
"arrow-buffer",
|
||||
"arrow-schema",
|
||||
"async-compression",
|
||||
"async-trait",
|
||||
"bytes",
|
||||
|
@ -1149,7 +1147,6 @@ dependencies = [
|
|||
"flate2",
|
||||
"futures",
|
||||
"glob",
|
||||
"half 2.1.0",
|
||||
"hashbrown",
|
||||
"itertools",
|
||||
"lazy_static",
|
||||
|
@ -1176,7 +1173,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-common"
|
||||
version = "13.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=2b08a43b82127ef144204e5999dd2730fa1c4756#2b08a43b82127ef144204e5999dd2730fa1c4756"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=761e1671bd3c4988d21a38bb19e50bdac6cfaa61#761e1671bd3c4988d21a38bb19e50bdac6cfaa61"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"chrono",
|
||||
|
@ -1189,7 +1186,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-expr"
|
||||
version = "13.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=2b08a43b82127ef144204e5999dd2730fa1c4756#2b08a43b82127ef144204e5999dd2730fa1c4756"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=761e1671bd3c4988d21a38bb19e50bdac6cfaa61#761e1671bd3c4988d21a38bb19e50bdac6cfaa61"
|
||||
dependencies = [
|
||||
"ahash 0.8.0",
|
||||
"arrow",
|
||||
|
@ -1201,7 +1198,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-optimizer"
|
||||
version = "13.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=2b08a43b82127ef144204e5999dd2730fa1c4756#2b08a43b82127ef144204e5999dd2730fa1c4756"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=761e1671bd3c4988d21a38bb19e50bdac6cfaa61#761e1671bd3c4988d21a38bb19e50bdac6cfaa61"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
|
@ -1216,32 +1213,37 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-physical-expr"
|
||||
version = "13.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=2b08a43b82127ef144204e5999dd2730fa1c4756#2b08a43b82127ef144204e5999dd2730fa1c4756"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=761e1671bd3c4988d21a38bb19e50bdac6cfaa61#761e1671bd3c4988d21a38bb19e50bdac6cfaa61"
|
||||
dependencies = [
|
||||
"ahash 0.8.0",
|
||||
"arrow",
|
||||
"arrow-buffer",
|
||||
"arrow-schema",
|
||||
"blake2",
|
||||
"blake3",
|
||||
"chrono",
|
||||
"datafusion-common",
|
||||
"datafusion-expr",
|
||||
"datafusion-row",
|
||||
"half 2.1.0",
|
||||
"hashbrown",
|
||||
"itertools",
|
||||
"lazy_static",
|
||||
"md-5",
|
||||
"num-traits",
|
||||
"ordered-float 3.3.0",
|
||||
"paste",
|
||||
"rand",
|
||||
"regex",
|
||||
"sha2",
|
||||
"unicode-segmentation",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "datafusion-proto"
|
||||
version = "13.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=2b08a43b82127ef144204e5999dd2730fa1c4756#2b08a43b82127ef144204e5999dd2730fa1c4756"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=761e1671bd3c4988d21a38bb19e50bdac6cfaa61#761e1671bd3c4988d21a38bb19e50bdac6cfaa61"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"datafusion",
|
||||
|
@ -1255,7 +1257,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-row"
|
||||
version = "13.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=2b08a43b82127ef144204e5999dd2730fa1c4756#2b08a43b82127ef144204e5999dd2730fa1c4756"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=761e1671bd3c4988d21a38bb19e50bdac6cfaa61#761e1671bd3c4988d21a38bb19e50bdac6cfaa61"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"datafusion-common",
|
||||
|
@ -1266,7 +1268,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-sql"
|
||||
version = "13.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=2b08a43b82127ef144204e5999dd2730fa1c4756#2b08a43b82127ef144204e5999dd2730fa1c4756"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=761e1671bd3c4988d21a38bb19e50bdac6cfaa61#761e1671bd3c4988d21a38bb19e50bdac6cfaa61"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"datafusion-common",
|
||||
|
@ -1584,13 +1586,13 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "futures-intrusive"
|
||||
version = "0.4.1"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1b6bdbb8c5a42b2bb5ee8dd9dc2c7d73ce3e15d26dfe100fb347ffa3f58c672b"
|
||||
checksum = "a604f7a68fbf8103337523b1fadc8ade7361ee3f112f7c680ad179651616aed5"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"lock_api",
|
||||
"parking_lot 0.12.1",
|
||||
"parking_lot 0.11.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
@ -110,8 +110,8 @@ license = "MIT OR Apache-2.0"
|
|||
[workspace.dependencies]
|
||||
arrow = { version = "25.0.0" }
|
||||
arrow-flight = { version = "25.0.0" }
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="2b08a43b82127ef144204e5999dd2730fa1c4756", default-features = false }
|
||||
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="2b08a43b82127ef144204e5999dd2730fa1c4756" }
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="761e1671bd3c4988d21a38bb19e50bdac6cfaa61", default-features = false }
|
||||
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="761e1671bd3c4988d21a38bb19e50bdac6cfaa61" }
|
||||
parquet = { version = "25.0.0" }
|
||||
|
||||
# This profile optimizes for runtime performance and small binary size at the expense of longer
|
||||
|
|
|
@ -166,6 +166,16 @@ macro_rules! gen_compactor_config {
|
|||
action
|
||||
)]
|
||||
pub max_num_compacting_files: usize,
|
||||
|
||||
/// Number of minutes without a write to a partition before it is considered cold
|
||||
/// and thus a candidate for compaction
|
||||
#[clap(
|
||||
long = "compaction-minutes-without-new-writes-to-be-cold",
|
||||
env = "INFLUXDB_IOX_COMPACTION_MINUTES_WITHOUT_NEW_WRITE_TO_BE_COLD",
|
||||
default_value = "480",
|
||||
action
|
||||
)]
|
||||
pub minutes_without_new_writes_to_be_cold: u64,
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -193,6 +203,7 @@ impl CompactorOnceConfig {
|
|||
min_num_rows_allocated_per_record_batch_to_datafusion_plan: self
|
||||
.min_num_rows_allocated_per_record_batch_to_datafusion_plan,
|
||||
max_num_compacting_files: self.max_num_compacting_files,
|
||||
minutes_without_new_writes_to_be_cold: self.minutes_without_new_writes_to_be_cold,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -700,6 +700,7 @@ mod tests {
|
|||
memory_budget_bytes: 100_000_000,
|
||||
min_num_rows_allocated_per_record_batch_to_datafusion_plan: 1,
|
||||
max_num_compacting_files: 20,
|
||||
minutes_without_new_writes_to_be_cold: 10,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -275,14 +275,15 @@ impl Compactor {
|
|||
("partition_type", compaction_type.into()),
|
||||
]);
|
||||
|
||||
let time_8_hours_ago = Timestamp::from(self.time_provider.hours_ago(8));
|
||||
let minutes = self.config.minutes_without_new_writes_to_be_cold;
|
||||
let time_in_the_past = Timestamp::from(self.time_provider.minutes_ago(minutes));
|
||||
|
||||
let mut repos = self.catalog.repositories().await;
|
||||
let mut partitions = repos
|
||||
.parquet_files()
|
||||
.most_cold_files_partitions(
|
||||
*shard_id,
|
||||
time_8_hours_ago,
|
||||
time_in_the_past,
|
||||
max_num_partitions_per_shard,
|
||||
)
|
||||
.await
|
||||
|
@ -688,6 +689,7 @@ pub mod tests {
|
|||
memory_budget_bytes: 10 * 1024 * 1024,
|
||||
min_num_rows_allocated_per_record_batch_to_datafusion_plan: 100,
|
||||
max_num_compacting_files: 20,
|
||||
minutes_without_new_writes_to_be_cold: 10,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -776,7 +778,9 @@ pub mod tests {
|
|||
|
||||
// Create a compactor
|
||||
let time_provider = Arc::new(SystemProvider::new());
|
||||
let config = make_compactor_config();
|
||||
let mut config = make_compactor_config();
|
||||
// 8 hours to be cold
|
||||
config.minutes_without_new_writes_to_be_cold = 8 * 60;
|
||||
let compactor = Compactor::new(
|
||||
vec![shard.id, another_shard.id],
|
||||
Arc::clone(&catalog.catalog),
|
||||
|
|
|
@ -156,6 +156,9 @@ pub struct CompactorConfig {
|
|||
/// Due to limit in fan-in of datafusion plan, we need to limit the number of files to compact
|
||||
/// per partition.
|
||||
pub max_num_compacting_files: usize,
|
||||
|
||||
/// Minutes without any new data before a partition is considered cold
|
||||
pub minutes_without_new_writes_to_be_cold: u64,
|
||||
}
|
||||
|
||||
/// How long to pause before checking for more work again if there was
|
||||
|
|
|
@ -494,6 +494,7 @@ mod tests {
|
|||
memory_budget_bytes: 10 * 1024 * 1024,
|
||||
min_num_rows_allocated_per_record_batch_to_datafusion_plan: 100,
|
||||
max_num_compacting_files: 20,
|
||||
minutes_without_new_writes_to_be_cold: 10,
|
||||
};
|
||||
let compactor = Arc::new(Compactor::new(
|
||||
vec![shard1.shard.id, shard2.shard.id],
|
||||
|
|
|
@ -561,6 +561,7 @@ pub mod tests {
|
|||
memory_budget_bytes: budget,
|
||||
min_num_rows_allocated_per_record_batch_to_datafusion_plan: 2,
|
||||
max_num_compacting_files: 20,
|
||||
minutes_without_new_writes_to_be_cold: 10,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -939,6 +940,7 @@ pub mod tests {
|
|||
memory_budget_bytes: 100_000_000,
|
||||
min_num_rows_allocated_per_record_batch_to_datafusion_plan: 100,
|
||||
max_num_compacting_files: 20,
|
||||
minutes_without_new_writes_to_be_cold: 10,
|
||||
};
|
||||
|
||||
let metrics = Arc::new(metric::Registry::new());
|
||||
|
|
|
@ -426,6 +426,7 @@ impl Config {
|
|||
memory_budget_bytes: 300_000,
|
||||
min_num_rows_allocated_per_record_batch_to_datafusion_plan: 100,
|
||||
max_num_compacting_files: 20,
|
||||
minutes_without_new_writes_to_be_cold: 10,
|
||||
};
|
||||
|
||||
let querier_config = QuerierConfig {
|
||||
|
|
|
@ -199,7 +199,8 @@ agents = [{name = "foo", sampling_interval = "1s", count = 3}]
|
|||
group.bench_function("single agent with basic configuration", |b| {
|
||||
b.iter(|| {
|
||||
agent.reset_current_date_time(0);
|
||||
let points_writer = points_writer.build_for_agent("foo", "foo", "foo").unwrap();
|
||||
let points_writer =
|
||||
Arc::new(points_writer.build_for_agent("foo", "foo", "foo").unwrap());
|
||||
let r = block_on(agent.generate_all(points_writer, 1, Arc::clone(&counter)));
|
||||
let n_points = r.expect("Could not generate data");
|
||||
assert_eq!(n_points, expected_points as usize);
|
||||
|
|
|
@ -134,7 +134,7 @@ impl Agent {
|
|||
/// called `batch_size` times before writing. Meant to be called in a `tokio::task`.
|
||||
pub async fn generate_all(
|
||||
&mut self,
|
||||
mut points_writer: PointsWriter,
|
||||
points_writer: Arc<PointsWriter>,
|
||||
batch_size: usize,
|
||||
counter: Arc<AtomicU64>,
|
||||
) -> Result<usize> {
|
||||
|
|
|
@ -153,7 +153,7 @@ pub async fn generate(
|
|||
)
|
||||
.context(CouldNotCreateAgentSnafu)?;
|
||||
|
||||
info!(
|
||||
println!(
|
||||
"Configuring {} agents of \"{}\" to write data \
|
||||
to org {} and bucket {} (database {})",
|
||||
agent_assignment.count,
|
||||
|
@ -163,12 +163,15 @@ pub async fn generate(
|
|||
database_assignments.database,
|
||||
);
|
||||
|
||||
for mut agent in agents.into_iter() {
|
||||
let agent_points_writer = points_writer_builder
|
||||
let agent_points_writer = Arc::new(
|
||||
points_writer_builder
|
||||
.build_for_agent(&agent_assignment.spec.name, org, bucket)
|
||||
.context(CouldNotCreateAgentWriterSnafu)?;
|
||||
.context(CouldNotCreateAgentWriterSnafu)?,
|
||||
);
|
||||
|
||||
for mut agent in agents.into_iter() {
|
||||
let lock_ref = Arc::clone(&lock);
|
||||
let agent_points_writer = Arc::clone(&agent_points_writer);
|
||||
|
||||
let total_rows = Arc::clone(&total_rows);
|
||||
handles.push(tokio::task::spawn(async move {
|
||||
|
|
|
@ -10,14 +10,12 @@ use parquet_file::{metadata::IoxMetadata, serialize};
|
|||
use schema::Projection;
|
||||
use snafu::{ensure, ResultExt, Snafu};
|
||||
#[cfg(test)]
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
use std::{
|
||||
fs::{self, File, OpenOptions},
|
||||
io::{BufWriter, Write},
|
||||
path::{Path, PathBuf},
|
||||
sync::Mutex,
|
||||
};
|
||||
|
||||
/// Errors that may happen while writing points.
|
||||
|
@ -243,7 +241,7 @@ impl PointsWriterBuilder {
|
|||
.open(&filename)
|
||||
.context(CantOpenLineProtocolFileSnafu { filename })?;
|
||||
|
||||
let file = BufWriter::new(file);
|
||||
let file = Mutex::new(BufWriter::new(file));
|
||||
|
||||
InnerPointsWriter::File { file }
|
||||
}
|
||||
|
@ -279,7 +277,7 @@ pub struct PointsWriter {
|
|||
impl PointsWriter {
|
||||
/// Write these points
|
||||
pub async fn write_points(
|
||||
&mut self,
|
||||
&self,
|
||||
points: impl Iterator<Item = LineToGenerate> + Send + Sync + 'static,
|
||||
) -> Result<()> {
|
||||
self.inner_writer.write_points(points).await
|
||||
|
@ -294,7 +292,7 @@ enum InnerPointsWriter {
|
|||
bucket: String,
|
||||
},
|
||||
File {
|
||||
file: BufWriter<File>,
|
||||
file: Mutex<BufWriter<File>>,
|
||||
},
|
||||
ParquetFile {
|
||||
dir_path: PathBuf,
|
||||
|
@ -310,7 +308,7 @@ enum InnerPointsWriter {
|
|||
|
||||
impl InnerPointsWriter {
|
||||
async fn write_points(
|
||||
&mut self,
|
||||
&self,
|
||||
points: impl Iterator<Item = LineToGenerate> + Send + Sync + 'static,
|
||||
) -> Result<()> {
|
||||
match self {
|
||||
|
@ -326,6 +324,7 @@ impl InnerPointsWriter {
|
|||
}
|
||||
Self::File { file } => {
|
||||
for point in points {
|
||||
let mut file = file.lock().expect("Should be able to get lock");
|
||||
point
|
||||
.write_data_point_to(&mut *file)
|
||||
.context(CantWriteToLineProtocolFileSnafu)?;
|
||||
|
@ -389,7 +388,7 @@ impl InnerPointsWriter {
|
|||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
Self::Vec(ref mut vec) => {
|
||||
Self::Vec(vec) => {
|
||||
let vec_ref = Arc::clone(vec);
|
||||
let mut vec = vec_ref.lock().expect("Should be able to get lock");
|
||||
for point in points {
|
||||
|
|
|
@ -210,6 +210,7 @@ pub async fn build_compactor_from_config(
|
|||
memory_budget_bytes,
|
||||
min_num_rows_allocated_per_record_batch_to_datafusion_plan,
|
||||
max_num_compacting_files,
|
||||
minutes_without_new_writes_to_be_cold,
|
||||
..
|
||||
} = compactor_config;
|
||||
|
||||
|
@ -223,6 +224,7 @@ pub async fn build_compactor_from_config(
|
|||
memory_budget_bytes,
|
||||
min_num_rows_allocated_per_record_batch_to_datafusion_plan,
|
||||
max_num_compacting_files,
|
||||
minutes_without_new_writes_to_be_cold,
|
||||
};
|
||||
|
||||
Ok(compactor::compact::Compactor::new(
|
||||
|
|
|
@ -89,8 +89,7 @@ async fn sql_select_with_schema_merge() {
|
|||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_schema_merge_nonexistent_column() {
|
||||
let expected_error = "Schema error: No field named 'foo'. Valid fields are 'cpu.host', \
|
||||
'cpu.region', 'cpu.system', 'cpu.time', 'cpu.user'.";
|
||||
let expected_error = "Schema error: No field named 'foo'";
|
||||
run_sql_error_test_case(
|
||||
MultiChunkSchemaMerge {},
|
||||
"SELECT * from cpu where foo = 8",
|
||||
|
|
|
@ -25,7 +25,7 @@ bytes = { version = "1", features = ["std"] }
|
|||
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "iana-time-zone", "serde", "std", "winapi"] }
|
||||
crossbeam-utils = { version = "0.8", features = ["std"] }
|
||||
crypto-common = { version = "0.1", default-features = false, features = ["std"] }
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2b08a43b82127ef144204e5999dd2730fa1c4756", features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "761e1671bd3c4988d21a38bb19e50bdac6cfaa61", features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
|
||||
digest = { version = "0.10", features = ["alloc", "block-buffer", "core-api", "mac", "std", "subtle"] }
|
||||
either = { version = "1", features = ["use_std"] }
|
||||
fixedbitset = { version = "0.4", features = ["std"] }
|
||||
|
|
|
@ -33,6 +33,7 @@ pub struct WriteBufferConnection {
|
|||
pub type_: String,
|
||||
|
||||
/// Connection string, depends on [`type_`](Self::type_).
|
||||
/// When Kafka type is selected, multiple bootstrap_broker can be separated by commas.
|
||||
pub connection: String,
|
||||
|
||||
/// Special configs to be applied when establishing the connection.
|
||||
|
|
|
@ -431,7 +431,8 @@ async fn setup_topic(
|
|||
partitions: Option<Range<i32>>,
|
||||
) -> Result<BTreeMap<ShardIndex, PartitionClient>> {
|
||||
let client_config = ClientConfig::try_from(connection_config)?;
|
||||
let mut client_builder = ClientBuilder::new(vec![conn]);
|
||||
let mut client_builder =
|
||||
ClientBuilder::new(conn.split(',').map(|s| s.trim().to_owned()).collect());
|
||||
if let Some(client_id) = client_config.client_id {
|
||||
client_builder = client_builder.client_id(client_id);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue