fix: rustfmt
parent
fbae4282df
commit
fe9c474620
|
|
@ -259,8 +259,7 @@ impl Compactor {
|
|||
// to prioritize partitions
|
||||
min_recent_ingested_files: usize,
|
||||
) -> Result<Vec<PartitionParam>> {
|
||||
let mut candidates =
|
||||
Vec::with_capacity(self.shards.len() * max_num_partitions_per_shard);
|
||||
let mut candidates = Vec::with_capacity(self.shards.len() * max_num_partitions_per_shard);
|
||||
let mut repos = self.catalog.repositories().await;
|
||||
|
||||
for shard_id in &self.shards {
|
||||
|
|
@ -322,8 +321,7 @@ impl Compactor {
|
|||
// Max number of cold partitions per shard we want to compact
|
||||
max_num_partitions_per_shard: usize,
|
||||
) -> Result<Vec<PartitionParam>> {
|
||||
let mut candidates =
|
||||
Vec::with_capacity(self.shards.len() * max_num_partitions_per_shard);
|
||||
let mut candidates = Vec::with_capacity(self.shards.len() * max_num_partitions_per_shard);
|
||||
let mut repos = self.catalog.repositories().await;
|
||||
|
||||
for shard_id in &self.shards {
|
||||
|
|
|
|||
|
|
@ -242,10 +242,7 @@ mod tests {
|
|||
table.create_column("tag2", ColumnType::Tag).await;
|
||||
table.create_column("tag3", ColumnType::Tag).await;
|
||||
table.create_column("time", ColumnType::Time).await;
|
||||
let partition = table
|
||||
.with_shard(&sequencer)
|
||||
.create_partition("part")
|
||||
.await;
|
||||
let partition = table.with_shard(&sequencer).create_partition("part").await;
|
||||
let time = Arc::new(SystemProvider::new());
|
||||
let config = make_compactor_config();
|
||||
let metrics = Arc::new(metric::Registry::new());
|
||||
|
|
@ -463,10 +460,7 @@ mod tests {
|
|||
table.create_column("tag2", ColumnType::Tag).await;
|
||||
table.create_column("tag3", ColumnType::Tag).await;
|
||||
table.create_column("time", ColumnType::Time).await;
|
||||
let partition = table
|
||||
.with_shard(&sequencer)
|
||||
.create_partition("part")
|
||||
.await;
|
||||
let partition = table.with_shard(&sequencer).create_partition("part").await;
|
||||
let time = Arc::new(SystemProvider::new());
|
||||
let time_38_hour_ago = (time.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos();
|
||||
let config = make_compactor_config();
|
||||
|
|
@ -650,10 +644,7 @@ mod tests {
|
|||
table.create_column("tag2", ColumnType::Tag).await;
|
||||
table.create_column("tag3", ColumnType::Tag).await;
|
||||
table.create_column("time", ColumnType::Time).await;
|
||||
let partition = table
|
||||
.with_shard(&sequencer)
|
||||
.create_partition("part")
|
||||
.await;
|
||||
let partition = table.with_shard(&sequencer).create_partition("part").await;
|
||||
let time = Arc::new(SystemProvider::new());
|
||||
let time_38_hour_ago = (time.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos();
|
||||
let config = make_compactor_config();
|
||||
|
|
|
|||
|
|
@ -319,10 +319,7 @@ pub(crate) async fn compact_parquet_files(
|
|||
|
||||
info!(?partition_id, "compaction complete");
|
||||
|
||||
let attributes = Attributes::from([(
|
||||
"shard_id",
|
||||
format!("{}", partition.shard_id()).into(),
|
||||
)]);
|
||||
let attributes = Attributes::from([("shard_id", format!("{}", partition.shard_id()).into())]);
|
||||
let compaction_input_file_bytes = compaction_input_file_bytes.recorder(attributes);
|
||||
for size in file_sizes {
|
||||
compaction_input_file_bytes.record(size as u64);
|
||||
|
|
|
|||
|
|
@ -476,13 +476,7 @@ impl ShardData {
|
|||
};
|
||||
|
||||
namespace_data
|
||||
.buffer_operation(
|
||||
dml_operation,
|
||||
shard_id,
|
||||
catalog,
|
||||
lifecycle_handle,
|
||||
executor,
|
||||
)
|
||||
.buffer_operation(dml_operation, shard_id, catalog, lifecycle_handle, executor)
|
||||
.await
|
||||
}
|
||||
|
||||
|
|
@ -944,12 +938,8 @@ impl TableData {
|
|||
}
|
||||
}
|
||||
|
||||
let should_pause = lifecycle_handle.log_write(
|
||||
partition_data.id,
|
||||
shard_id,
|
||||
sequence_number,
|
||||
batch.size(),
|
||||
);
|
||||
let should_pause =
|
||||
lifecycle_handle.log_write(partition_data.id, shard_id, sequence_number, batch.size());
|
||||
partition_data.buffer_write(sequence_number, batch)?;
|
||||
|
||||
Ok(should_pause)
|
||||
|
|
|
|||
|
|
@ -496,7 +496,8 @@ mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
if has_measurement && shard.min_unpersisted_sequence_number == SequenceNumber::new(9)
|
||||
if has_measurement
|
||||
&& shard.min_unpersisted_sequence_number == SequenceNumber::new(9)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -616,12 +616,9 @@ mod tests {
|
|||
assert!(!h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(2), 1));
|
||||
|
||||
// log another write for different partition using a different handle
|
||||
assert!(!m.handle().log_write(
|
||||
PartitionId::new(2),
|
||||
shard_id,
|
||||
SequenceNumber::new(3),
|
||||
3
|
||||
));
|
||||
assert!(!m
|
||||
.handle()
|
||||
.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(3), 3));
|
||||
|
||||
let stats = m.stats();
|
||||
assert_eq!(stats.total_bytes, 5);
|
||||
|
|
@ -890,12 +887,7 @@ mod tests {
|
|||
let partition_id = PartitionId::new(1);
|
||||
let persister = Arc::new(TestPersister::default());
|
||||
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 8);
|
||||
h.log_write(
|
||||
PartitionId::new(2),
|
||||
shard_id,
|
||||
SequenceNumber::new(2),
|
||||
13,
|
||||
);
|
||||
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 13);
|
||||
|
||||
m.maybe_persist(&persister).await;
|
||||
|
||||
|
|
@ -912,12 +904,7 @@ mod tests {
|
|||
|
||||
// add that partition back in over size
|
||||
h.log_write(partition_id, shard_id, SequenceNumber::new(3), 20);
|
||||
h.log_write(
|
||||
PartitionId::new(2),
|
||||
shard_id,
|
||||
SequenceNumber::new(4),
|
||||
21,
|
||||
);
|
||||
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(4), 21);
|
||||
|
||||
// both partitions should now need to be persisted to bring us below the mem threshold of 20.
|
||||
m.maybe_persist(&persister).await;
|
||||
|
|
|
|||
|
|
@ -860,13 +860,13 @@ pub(crate) async fn make_one_partition_with_tombstones(
|
|||
let mut seq_num = seq_num.get();
|
||||
seq_num += 1;
|
||||
let ts = create_tombstone(
|
||||
2, // tombstone id
|
||||
table_id.get(), // table id
|
||||
2, // tombstone id
|
||||
table_id.get(), // table id
|
||||
shard_id.get(), // sequencer id
|
||||
seq_num, // delete's seq_number
|
||||
10, // min time of data to get deleted
|
||||
50, // max time of data to get deleted
|
||||
"city=Boston", // delete predicate
|
||||
seq_num, // delete's seq_number
|
||||
10, // min time of data to get deleted
|
||||
50, // max time of data to get deleted
|
||||
"city=Boston", // delete predicate
|
||||
);
|
||||
p1.buffer_tombstone(exec, table_name, ts).await;
|
||||
|
||||
|
|
|
|||
|
|
@ -5,8 +5,8 @@ use data_types::{
|
|||
Column, ColumnSchema, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace,
|
||||
NamespaceId, NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams, Partition,
|
||||
PartitionId, PartitionInfo, PartitionKey, PartitionParam, ProcessedTombstone, QueryPool,
|
||||
QueryPoolId, SequenceNumber, Shard, ShardId, Table, TableId, TablePartition,
|
||||
TableSchema, Timestamp, Tombstone, TombstoneId,
|
||||
QueryPoolId, SequenceNumber, Shard, ShardId, Table, TableId, TablePartition, TableSchema,
|
||||
Timestamp, Tombstone, TombstoneId,
|
||||
};
|
||||
use iox_time::TimeProvider;
|
||||
use snafu::{OptionExt, Snafu};
|
||||
|
|
|
|||
|
|
@ -13,8 +13,7 @@
|
|||
|
||||
use crate::interface::{ColumnUpsertRequest, Error, RepoCollection, Result, Transaction};
|
||||
use data_types::{
|
||||
ColumnType, KafkaPartition, KafkaTopic, NamespaceSchema, QueryPool, Shard, ShardId,
|
||||
TableSchema,
|
||||
ColumnType, KafkaPartition, KafkaTopic, NamespaceSchema, QueryPool, Shard, ShardId, TableSchema,
|
||||
};
|
||||
use mutable_batch::MutableBatch;
|
||||
use std::{borrow::Cow, collections::BTreeMap};
|
||||
|
|
|
|||
|
|
@ -691,22 +691,23 @@ impl PartitionRepo for MemTxn {
|
|||
) -> Result<Partition> {
|
||||
let stage = self.stage();
|
||||
|
||||
let partition = match stage.partitions.iter().find(|p| {
|
||||
p.partition_key == key && p.shard_id == shard_id && p.table_id == table_id
|
||||
}) {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
let p = Partition {
|
||||
id: PartitionId::new(stage.partitions.len() as i64 + 1),
|
||||
shard_id,
|
||||
table_id,
|
||||
partition_key: key,
|
||||
sort_key: vec![],
|
||||
};
|
||||
stage.partitions.push(p);
|
||||
stage.partitions.last().unwrap()
|
||||
}
|
||||
};
|
||||
let partition =
|
||||
match stage.partitions.iter().find(|p| {
|
||||
p.partition_key == key && p.shard_id == shard_id && p.table_id == table_id
|
||||
}) {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
let p = Partition {
|
||||
id: PartitionId::new(stage.partitions.len() as i64 + 1),
|
||||
shard_id,
|
||||
table_id,
|
||||
partition_key: key,
|
||||
sort_key: vec![],
|
||||
};
|
||||
stage.partitions.push(p);
|
||||
stage.partitions.last().unwrap()
|
||||
}
|
||||
};
|
||||
|
||||
Ok(partition.clone())
|
||||
}
|
||||
|
|
@ -829,9 +830,7 @@ impl TombstoneRepo for MemTxn {
|
|||
let stage = self.stage();
|
||||
|
||||
let tombstone = match stage.tombstones.iter().find(|t| {
|
||||
t.table_id == table_id
|
||||
&& t.shard_id == shard_id
|
||||
&& t.sequence_number == sequence_number
|
||||
t.table_id == table_id && t.shard_id == shard_id && t.sequence_number == sequence_number
|
||||
}) {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
|
|
|
|||
|
|
@ -7,8 +7,7 @@ use arrow::{
|
|||
use data_types::{
|
||||
Column, ColumnSet, ColumnType, CompactionLevel, KafkaPartition, KafkaTopic, Namespace,
|
||||
NamespaceSchema, ParquetFile, ParquetFileParams, Partition, PartitionId, QueryPool,
|
||||
SequenceNumber, Shard, ShardId, Table, TableId, TableSchema, Timestamp, Tombstone,
|
||||
TombstoneId,
|
||||
SequenceNumber, Shard, ShardId, Table, TableId, TableSchema, Timestamp, Tombstone, TombstoneId,
|
||||
};
|
||||
use datafusion::physical_plan::metrics::Count;
|
||||
use iox_catalog::{
|
||||
|
|
@ -184,10 +183,7 @@ impl TestCatalog {
|
|||
}
|
||||
|
||||
/// List level 0 files
|
||||
pub async fn list_level_0_files(
|
||||
self: &Arc<Self>,
|
||||
shard_id: ShardId,
|
||||
) -> Vec<ParquetFile> {
|
||||
pub async fn list_level_0_files(self: &Arc<Self>, shard_id: ShardId) -> Vec<ParquetFile> {
|
||||
self.catalog
|
||||
.repositories()
|
||||
.await
|
||||
|
|
@ -299,10 +295,7 @@ pub struct TestTable {
|
|||
|
||||
impl TestTable {
|
||||
/// Attach a shard to the table
|
||||
pub fn with_shard(
|
||||
self: &Arc<Self>,
|
||||
shard: &Arc<TestShard>,
|
||||
) -> Arc<TestTableBoundShard> {
|
||||
pub fn with_shard(self: &Arc<Self>, shard: &Arc<TestShard>) -> Arc<TestTableBoundShard> {
|
||||
assert!(Arc::ptr_eq(&self.catalog, &shard.catalog));
|
||||
assert!(Arc::ptr_eq(&self.namespace, &shard.namespace));
|
||||
|
||||
|
|
|
|||
|
|
@ -89,8 +89,8 @@
|
|||
use bytes::Bytes;
|
||||
use data_types::{
|
||||
ColumnId, ColumnSet, ColumnSummary, CompactionLevel, InfluxDbType, NamespaceId,
|
||||
ParquetFileParams, PartitionId, PartitionKey, SequenceNumber, ShardId, StatValues,
|
||||
Statistics, TableId, Timestamp,
|
||||
ParquetFileParams, PartitionId, PartitionKey, SequenceNumber, ShardId, StatValues, Statistics,
|
||||
TableId, Timestamp,
|
||||
};
|
||||
use generated_types::influxdata::iox::ingester::v1 as proto;
|
||||
use iox_time::Time;
|
||||
|
|
|
|||
|
|
@ -166,9 +166,7 @@ mod tests {
|
|||
use crate::metadata::IoxParquetMetaData;
|
||||
use arrow::array::{ArrayRef, StringArray};
|
||||
use bytes::Bytes;
|
||||
use data_types::{
|
||||
CompactionLevel, NamespaceId, PartitionId, SequenceNumber, ShardId, TableId,
|
||||
};
|
||||
use data_types::{CompactionLevel, NamespaceId, PartitionId, SequenceNumber, ShardId, TableId};
|
||||
use iox_time::Time;
|
||||
use parquet::{
|
||||
arrow::{ArrowReader, ParquetFileArrowReader},
|
||||
|
|
|
|||
|
|
@ -405,9 +405,7 @@ fn project_for_parquet_reader(
|
|||
mod tests {
|
||||
use super::*;
|
||||
use arrow::array::{ArrayRef, Int64Array, StringArray};
|
||||
use data_types::{
|
||||
CompactionLevel, NamespaceId, PartitionId, SequenceNumber, ShardId, TableId,
|
||||
};
|
||||
use data_types::{CompactionLevel, NamespaceId, PartitionId, SequenceNumber, ShardId, TableId};
|
||||
use datafusion::common::DataFusionError;
|
||||
use iox_time::Time;
|
||||
use std::collections::HashMap;
|
||||
|
|
|
|||
|
|
@ -475,10 +475,7 @@ mod tests {
|
|||
table.create_column("time", ColumnType::Time).await;
|
||||
let sequencer1 = ns.create_shard(1).await;
|
||||
|
||||
let partition = table
|
||||
.with_shard(&sequencer1)
|
||||
.create_partition("k")
|
||||
.await;
|
||||
let partition = table.with_shard(&sequencer1).create_partition("k").await;
|
||||
|
||||
(table, partition)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -250,10 +250,7 @@ mod tests {
|
|||
table.create_column("time", ColumnType::Time).await;
|
||||
let sequencer1 = ns.create_shard(1).await;
|
||||
|
||||
let partition = table
|
||||
.with_shard(&sequencer1)
|
||||
.create_partition("k")
|
||||
.await;
|
||||
let partition = table.with_shard(&sequencer1).create_partition("k").await;
|
||||
|
||||
(catalog, partition)
|
||||
}
|
||||
|
|
@ -328,10 +325,7 @@ mod tests {
|
|||
table.create_column("time", ColumnType::Time).await;
|
||||
let sequencer1 = ns.create_shard(1).await;
|
||||
|
||||
let partition = table
|
||||
.with_shard(&sequencer1)
|
||||
.create_partition("k")
|
||||
.await;
|
||||
let partition = table.with_shard(&sequencer1).create_partition("k").await;
|
||||
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(&format!("{table_name} foo=1 11"));
|
||||
|
|
|
|||
|
|
@ -549,18 +549,9 @@ mod tests {
|
|||
let sequencer1 = ns.create_shard(1).await;
|
||||
let sequencer2 = ns.create_shard(2).await;
|
||||
|
||||
let partition11 = table1
|
||||
.with_shard(&sequencer1)
|
||||
.create_partition("k")
|
||||
.await;
|
||||
let partition12 = table1
|
||||
.with_shard(&sequencer2)
|
||||
.create_partition("k")
|
||||
.await;
|
||||
let partition21 = table2
|
||||
.with_shard(&sequencer1)
|
||||
.create_partition("k")
|
||||
.await;
|
||||
let partition11 = table1.with_shard(&sequencer1).create_partition("k").await;
|
||||
let partition12 = table1.with_shard(&sequencer2).create_partition("k").await;
|
||||
let partition21 = table2.with_shard(&sequencer1).create_partition("k").await;
|
||||
|
||||
table1.create_column("time", ColumnType::Time).await;
|
||||
table1.create_column("foo", ColumnType::F64).await;
|
||||
|
|
@ -761,14 +752,8 @@ mod tests {
|
|||
let ns = catalog.create_namespace("ns").await;
|
||||
let table = ns.create_table("table").await;
|
||||
let sequencer = ns.create_shard(1).await;
|
||||
let partition1 = table
|
||||
.with_shard(&sequencer)
|
||||
.create_partition("k1")
|
||||
.await;
|
||||
let partition2 = table
|
||||
.with_shard(&sequencer)
|
||||
.create_partition("k2")
|
||||
.await;
|
||||
let partition1 = table.with_shard(&sequencer).create_partition("k1").await;
|
||||
let partition2 = table.with_shard(&sequencer).create_partition("k2").await;
|
||||
table.create_column("time", ColumnType::Time).await;
|
||||
table.create_column("foo", ColumnType::F64).await;
|
||||
|
||||
|
|
@ -914,14 +899,8 @@ mod tests {
|
|||
let ns = catalog.create_namespace("ns").await;
|
||||
let table = ns.create_table("table").await;
|
||||
let sequencer = ns.create_shard(1).await;
|
||||
let partition1 = table
|
||||
.with_shard(&sequencer)
|
||||
.create_partition("k1")
|
||||
.await;
|
||||
let partition2 = table
|
||||
.with_shard(&sequencer)
|
||||
.create_partition("k2")
|
||||
.await;
|
||||
let partition1 = table.with_shard(&sequencer).create_partition("k1").await;
|
||||
let partition2 = table.with_shard(&sequencer).create_partition("k2").await;
|
||||
|
||||
let schema = Arc::new(
|
||||
SchemaBuilder::new()
|
||||
|
|
|
|||
|
|
@ -104,8 +104,7 @@ impl Reconciler {
|
|||
tombstones.into_iter().map(QuerierTombstone::from).collect();
|
||||
|
||||
// match chunks and tombstones
|
||||
let mut tombstones_by_shard: HashMap<ShardId, Vec<QuerierTombstone>> =
|
||||
HashMap::new();
|
||||
let mut tombstones_by_shard: HashMap<ShardId, Vec<QuerierTombstone>> = HashMap::new();
|
||||
|
||||
for tombstone in querier_tombstones {
|
||||
tombstones_by_shard
|
||||
|
|
@ -129,8 +128,7 @@ impl Reconciler {
|
|||
Vec::with_capacity(parquet_files.len() + ingester_partitions.len());
|
||||
|
||||
for chunk in parquet_files.into_iter() {
|
||||
let chunk = if let Some(tombstones) =
|
||||
tombstones_by_shard.get(&chunk.meta().shard_id())
|
||||
let chunk = if let Some(tombstones) = tombstones_by_shard.get(&chunk.meta().shard_id())
|
||||
{
|
||||
let mut delete_predicates = Vec::with_capacity(tombstones.len());
|
||||
for tombstone in tombstones {
|
||||
|
|
|
|||
|
|
@ -14,9 +14,7 @@ use generated_types::{
|
|||
};
|
||||
use influxdb_iox_client::flight::{low_level::LowLevelMessage, Error as FlightError};
|
||||
use ingester::{
|
||||
data::{
|
||||
FlatIngesterQueryResponse, IngesterData, IngesterQueryResponse, Persister, ShardData,
|
||||
},
|
||||
data::{FlatIngesterQueryResponse, IngesterData, IngesterQueryResponse, Persister, ShardData},
|
||||
lifecycle::LifecycleHandle,
|
||||
querier_handler::prepare_data_to_querier,
|
||||
};
|
||||
|
|
@ -694,10 +692,7 @@ impl MockIngester {
|
|||
|
||||
let shards = BTreeMap::from([(
|
||||
sequencer.shard.id,
|
||||
ShardData::new(
|
||||
sequencer.shard.kafka_partition,
|
||||
catalog.metric_registry(),
|
||||
),
|
||||
ShardData::new(sequencer.shard.kafka_partition, catalog.metric_registry()),
|
||||
)]);
|
||||
let ingester_data = Arc::new(IngesterData::new(
|
||||
catalog.object_store(),
|
||||
|
|
@ -730,11 +725,7 @@ impl MockIngester {
|
|||
|
||||
let should_pause = self
|
||||
.ingester_data
|
||||
.buffer_operation(
|
||||
self.sequencer.shard.id,
|
||||
dml_operation,
|
||||
&lifecycle_handle,
|
||||
)
|
||||
.buffer_operation(self.sequencer.shard.id, dml_operation, &lifecycle_handle)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(!should_pause);
|
||||
|
|
|
|||
|
|
@ -218,9 +218,7 @@ mod tests {
|
|||
let eq = SequenceNumber::new(1);
|
||||
let gt = SequenceNumber::new(2);
|
||||
|
||||
let progress = ShardProgress::new()
|
||||
.with_buffered(eq)
|
||||
.with_persisted(lt);
|
||||
let progress = ShardProgress::new().with_buffered(eq).with_persisted(lt);
|
||||
|
||||
assert!(progress.readable(lt));
|
||||
assert!(progress.persisted(lt));
|
||||
|
|
@ -238,9 +236,7 @@ mod tests {
|
|||
let eq = SequenceNumber::new(1);
|
||||
let gt = SequenceNumber::new(2);
|
||||
|
||||
let progress = ShardProgress::new()
|
||||
.with_buffered(eq)
|
||||
.with_persisted(eq);
|
||||
let progress = ShardProgress::new().with_buffered(eq).with_persisted(eq);
|
||||
|
||||
assert!(progress.readable(lt));
|
||||
assert!(progress.persisted(lt));
|
||||
|
|
@ -282,9 +278,7 @@ mod tests {
|
|||
|
||||
let progress1 = ShardProgress::new().with_buffered(gt);
|
||||
|
||||
let progress2 = ShardProgress::new()
|
||||
.with_buffered(lt)
|
||||
.with_persisted(eq);
|
||||
let progress2 = ShardProgress::new().with_buffered(lt).with_persisted(eq);
|
||||
|
||||
let expected = ShardProgress::new()
|
||||
.with_buffered(lt)
|
||||
|
|
|
|||
Loading…
Reference in New Issue