Merge branch 'main' into dom/replication-proto
commit
c2f479d370
|
@ -2578,9 +2578,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "insta"
|
name = "insta"
|
||||||
version = "1.24.0"
|
version = "1.24.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e965b4a6e9d638d7af8e0bce7f650f4a31bc0f21f4ce891015822a81fac314a9"
|
checksum = "eb5686bd8e9239eabe90bb30a0c341bffd6fdc177fb556708f2cb792bf00352d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"console",
|
"console",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
|
|
|
@ -241,7 +241,7 @@ pub enum IngesterMapping {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Unique ID for a `Partition`
|
/// Unique ID for a `Partition`
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type, sqlx::FromRow)]
|
||||||
#[sqlx(transparent)]
|
#[sqlx(transparent)]
|
||||||
pub struct PartitionId(i64);
|
pub struct PartitionId(i64);
|
||||||
|
|
||||||
|
|
|
@ -13,4 +13,4 @@ workspace-hack = { path = "../workspace-hack"}
|
||||||
[dev-dependencies] # In alphabetical order
|
[dev-dependencies] # In alphabetical order
|
||||||
test_helpers = { path = "../test_helpers" }
|
test_helpers = { path = "../test_helpers" }
|
||||||
assert_matches = "1"
|
assert_matches = "1"
|
||||||
insta = { version = "1.24.0", features = ["yaml"] }
|
insta = { version = "1.24.1", features = ["yaml"] }
|
|
@ -0,0 +1,2 @@
|
||||||
|
-- This index will be used for selecting partitions with parquet files created after a given time
|
||||||
|
CREATE INDEX IF NOT EXISTS parquet_file_partition_created_idx ON parquet_file (partition_id, created_at);
|
|
@ -625,6 +625,14 @@ pub trait ParquetFileRepo: Send + Sync {
|
||||||
max_time: Timestamp,
|
max_time: Timestamp,
|
||||||
) -> Result<Vec<ParquetFile>>;
|
) -> Result<Vec<ParquetFile>>;
|
||||||
|
|
||||||
|
/// Select partition for cold/warm/hot compaction
|
||||||
|
/// These are partitions with files created recently (aka created after the specified time_in_the_past)
|
||||||
|
/// These files include all levels of compaction files and both non-deleted and soft-deleted files
|
||||||
|
async fn partitions_with_recent_created_files(
|
||||||
|
&mut self,
|
||||||
|
time_in_the_past: Timestamp,
|
||||||
|
) -> Result<Vec<PartitionId>>;
|
||||||
|
|
||||||
/// List the most recent highest throughput partition for a given shard, if specified
|
/// List the most recent highest throughput partition for a given shard, if specified
|
||||||
async fn recent_highest_throughput_partitions(
|
async fn recent_highest_throughput_partitions(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
@ -947,6 +955,7 @@ pub(crate) mod test_helpers {
|
||||||
test_parquet_file(Arc::clone(&catalog)).await;
|
test_parquet_file(Arc::clone(&catalog)).await;
|
||||||
test_parquet_file_compaction_level_0(Arc::clone(&catalog)).await;
|
test_parquet_file_compaction_level_0(Arc::clone(&catalog)).await;
|
||||||
test_parquet_file_compaction_level_1(Arc::clone(&catalog)).await;
|
test_parquet_file_compaction_level_1(Arc::clone(&catalog)).await;
|
||||||
|
test_partitions_with_recent_created_files(Arc::clone(&catalog)).await;
|
||||||
test_recent_highest_throughput_partitions(Arc::clone(&catalog)).await;
|
test_recent_highest_throughput_partitions(Arc::clone(&catalog)).await;
|
||||||
test_partitions_with_small_l1_file_count(Arc::clone(&catalog)).await;
|
test_partitions_with_small_l1_file_count(Arc::clone(&catalog)).await;
|
||||||
test_update_to_compaction_level_1(Arc::clone(&catalog)).await;
|
test_update_to_compaction_level_1(Arc::clone(&catalog)).await;
|
||||||
|
@ -3684,6 +3693,236 @@ pub(crate) mod test_helpers {
|
||||||
repos.abort().await.unwrap();
|
repos.abort().await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn test_partitions_with_recent_created_files(catalog: Arc<dyn Catalog>) {
|
||||||
|
let mut repos = catalog.repositories().await;
|
||||||
|
let topic = repos
|
||||||
|
.topics()
|
||||||
|
.create_or_get("recent_created_files")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let pool = repos
|
||||||
|
.query_pools()
|
||||||
|
.create_or_get("recent_created_files")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let namespace = repos
|
||||||
|
.namespaces()
|
||||||
|
.create(
|
||||||
|
"test_partitions_with_recent_created_files",
|
||||||
|
None,
|
||||||
|
topic.id,
|
||||||
|
pool.id,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let table = repos
|
||||||
|
.tables()
|
||||||
|
.create_or_get("test_table_for_recent_created_files", namespace.id)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let shard = repos
|
||||||
|
.shards()
|
||||||
|
.create_or_get(&topic, ShardIndex::new(101))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// param for the tests
|
||||||
|
let time_now = Timestamp::from(catalog.time_provider().now());
|
||||||
|
let time_one_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(1));
|
||||||
|
let time_two_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(2));
|
||||||
|
let time_three_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(3));
|
||||||
|
let time_five_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(5));
|
||||||
|
|
||||||
|
// Db has no partition
|
||||||
|
let partitions = repos
|
||||||
|
.parquet_files()
|
||||||
|
.partitions_with_recent_created_files(time_two_hour_ago)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(partitions.is_empty());
|
||||||
|
|
||||||
|
// -----------------
|
||||||
|
// PARTITION one
|
||||||
|
// The DB has 1 partition but it does not have any file
|
||||||
|
let partition1 = repos
|
||||||
|
.partitions()
|
||||||
|
.create_or_get("one".into(), shard.id, table.id)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let partitions = repos
|
||||||
|
.parquet_files()
|
||||||
|
.partitions_with_recent_created_files(time_two_hour_ago)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(partitions.is_empty());
|
||||||
|
|
||||||
|
// create files for partition one
|
||||||
|
|
||||||
|
let parquet_file_params = ParquetFileParams {
|
||||||
|
shard_id: shard.id,
|
||||||
|
namespace_id: namespace.id,
|
||||||
|
table_id: partition1.table_id,
|
||||||
|
partition_id: partition1.id,
|
||||||
|
object_store_id: Uuid::new_v4(),
|
||||||
|
max_sequence_number: SequenceNumber::new(140),
|
||||||
|
min_time: Timestamp::new(1),
|
||||||
|
max_time: Timestamp::new(10),
|
||||||
|
file_size_bytes: 1337,
|
||||||
|
row_count: 0,
|
||||||
|
compaction_level: CompactionLevel::Initial,
|
||||||
|
created_at: time_three_hour_ago,
|
||||||
|
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
|
||||||
|
};
|
||||||
|
|
||||||
|
// create a deleted L0 file that was created 3 hours ago
|
||||||
|
let delete_l0_file = repos
|
||||||
|
.parquet_files()
|
||||||
|
.create(parquet_file_params.clone())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
repos
|
||||||
|
.parquet_files()
|
||||||
|
.flag_for_delete(delete_l0_file.id)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let partitions = repos
|
||||||
|
.parquet_files()
|
||||||
|
.partitions_with_recent_created_files(time_two_hour_ago)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
// still empty becasue the file was not recently created
|
||||||
|
assert!(partitions.is_empty());
|
||||||
|
|
||||||
|
// create a deleted L0 file that was created 1 hour ago which is recently
|
||||||
|
let l0_one_hour_ago_file_params = ParquetFileParams {
|
||||||
|
object_store_id: Uuid::new_v4(),
|
||||||
|
created_at: time_one_hour_ago,
|
||||||
|
..parquet_file_params.clone()
|
||||||
|
};
|
||||||
|
repos
|
||||||
|
.parquet_files()
|
||||||
|
.create(l0_one_hour_ago_file_params.clone())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let partitions = repos
|
||||||
|
.parquet_files()
|
||||||
|
.partitions_with_recent_created_files(time_two_hour_ago)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
// partition one should be returned
|
||||||
|
assert_eq!(partitions.len(), 1);
|
||||||
|
assert!(partitions.contains(&partition1.id));
|
||||||
|
|
||||||
|
// -----------------
|
||||||
|
// PARTITION two
|
||||||
|
// Partition two without any file
|
||||||
|
let partition2 = repos
|
||||||
|
.partitions()
|
||||||
|
.create_or_get("two".into(), shard.id, table.id)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let partitions = repos
|
||||||
|
.parquet_files()
|
||||||
|
.partitions_with_recent_created_files(time_two_hour_ago)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
// should return partittion one only
|
||||||
|
assert_eq!(partitions.len(), 1);
|
||||||
|
assert!(partitions.contains(&partition1.id));
|
||||||
|
|
||||||
|
// Add a L0 file created non-recently (5 hours ago)
|
||||||
|
let l0_five_hour_ago_file_params = ParquetFileParams {
|
||||||
|
object_store_id: Uuid::new_v4(),
|
||||||
|
created_at: time_five_hour_ago,
|
||||||
|
partition_id: partition2.id,
|
||||||
|
..parquet_file_params.clone()
|
||||||
|
};
|
||||||
|
repos
|
||||||
|
.parquet_files()
|
||||||
|
.create(l0_five_hour_ago_file_params.clone())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let partitions = repos
|
||||||
|
.parquet_files()
|
||||||
|
.partitions_with_recent_created_files(time_two_hour_ago)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
// still return partittione one only
|
||||||
|
assert_eq!(partitions.len(), 1);
|
||||||
|
assert!(partitions.contains(&partition1.id));
|
||||||
|
|
||||||
|
// Add a L1 created recently (just now)
|
||||||
|
let l1_file_params = ParquetFileParams {
|
||||||
|
object_store_id: Uuid::new_v4(),
|
||||||
|
created_at: time_now,
|
||||||
|
partition_id: partition2.id,
|
||||||
|
compaction_level: CompactionLevel::FileNonOverlapped,
|
||||||
|
..parquet_file_params.clone()
|
||||||
|
};
|
||||||
|
repos
|
||||||
|
.parquet_files()
|
||||||
|
.create(l1_file_params.clone())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let partitions = repos
|
||||||
|
.parquet_files()
|
||||||
|
.partitions_with_recent_created_files(time_two_hour_ago)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
// should return both partitions
|
||||||
|
assert_eq!(partitions.len(), 2);
|
||||||
|
assert!(partitions.contains(&partition1.id));
|
||||||
|
assert!(partitions.contains(&partition2.id));
|
||||||
|
|
||||||
|
// -----------------
|
||||||
|
// PARTITION three
|
||||||
|
// Partition three without any file
|
||||||
|
let partition3 = repos
|
||||||
|
.partitions()
|
||||||
|
.create_or_get("three".into(), shard.id, table.id)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let partitions = repos
|
||||||
|
.parquet_files()
|
||||||
|
.partitions_with_recent_created_files(time_two_hour_ago)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
// should return partittion one and two only
|
||||||
|
assert_eq!(partitions.len(), 2);
|
||||||
|
assert!(partitions.contains(&partition1.id));
|
||||||
|
assert!(partitions.contains(&partition2.id));
|
||||||
|
|
||||||
|
// add an L0 file created recently (one hour ago)
|
||||||
|
let l0_one_hour_ago_file_params = ParquetFileParams {
|
||||||
|
object_store_id: Uuid::new_v4(),
|
||||||
|
created_at: time_one_hour_ago,
|
||||||
|
partition_id: partition3.id,
|
||||||
|
..parquet_file_params.clone()
|
||||||
|
};
|
||||||
|
repos
|
||||||
|
.parquet_files()
|
||||||
|
.create(l0_one_hour_ago_file_params.clone())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let partitions = repos
|
||||||
|
.parquet_files()
|
||||||
|
.partitions_with_recent_created_files(time_two_hour_ago)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
// should return all partitions
|
||||||
|
assert_eq!(partitions.len(), 3);
|
||||||
|
assert!(partitions.contains(&partition1.id));
|
||||||
|
assert!(partitions.contains(&partition2.id));
|
||||||
|
assert!(partitions.contains(&partition3.id));
|
||||||
|
|
||||||
|
// drop the namespace to avoid the crearted data in this tests from affacting other tests
|
||||||
|
repos
|
||||||
|
.namespaces()
|
||||||
|
.delete("test_partitions_with_recent_created_files")
|
||||||
|
.await
|
||||||
|
.expect("delete namespace should succeed");
|
||||||
|
}
|
||||||
|
|
||||||
async fn test_recent_highest_throughput_partitions(catalog: Arc<dyn Catalog>) {
|
async fn test_recent_highest_throughput_partitions(catalog: Arc<dyn Catalog>) {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
let topic = repos
|
let topic = repos
|
||||||
|
|
|
@ -1317,6 +1317,20 @@ impl ParquetFileRepo for MemTxn {
|
||||||
.cloned()
|
.cloned()
|
||||||
.collect())
|
.collect())
|
||||||
}
|
}
|
||||||
|
async fn partitions_with_recent_created_files(
|
||||||
|
&mut self,
|
||||||
|
time_in_the_past: Timestamp,
|
||||||
|
) -> Result<Vec<PartitionId>> {
|
||||||
|
let stage = self.stage();
|
||||||
|
|
||||||
|
let partitions: Vec<_> = stage
|
||||||
|
.parquet_files
|
||||||
|
.iter()
|
||||||
|
.filter(|f| f.created_at > time_in_the_past)
|
||||||
|
.map(|f| f.partition_id)
|
||||||
|
.collect();
|
||||||
|
Ok(partitions)
|
||||||
|
}
|
||||||
|
|
||||||
async fn recent_highest_throughput_partitions(
|
async fn recent_highest_throughput_partitions(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
|
|
@ -288,6 +288,7 @@ decorate!(
|
||||||
"parquet_count_by_overlaps_with_level_0" = count_by_overlaps_with_level_0(&mut self, table_id: TableId, shard_id: ShardId, min_time: Timestamp, max_time: Timestamp, sequence_number: SequenceNumber) -> Result<i64>;
|
"parquet_count_by_overlaps_with_level_0" = count_by_overlaps_with_level_0(&mut self, table_id: TableId, shard_id: ShardId, min_time: Timestamp, max_time: Timestamp, sequence_number: SequenceNumber) -> Result<i64>;
|
||||||
"parquet_count_by_overlaps_with_level_1" = count_by_overlaps_with_level_1(&mut self, table_id: TableId, shard_id: ShardId, min_time: Timestamp, max_time: Timestamp) -> Result<i64>;
|
"parquet_count_by_overlaps_with_level_1" = count_by_overlaps_with_level_1(&mut self, table_id: TableId, shard_id: ShardId, min_time: Timestamp, max_time: Timestamp) -> Result<i64>;
|
||||||
"parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<Option<ParquetFile>>;
|
"parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<Option<ParquetFile>>;
|
||||||
|
"partitions_with_recent_created_files" = partitions_with_recent_created_files(&mut self, time_in_the_past: Timestamp) -> Result<Vec<PartitionId>>;
|
||||||
"recent_highest_throughput_partitions" = recent_highest_throughput_partitions(&mut self, shard_id: Option<ShardId>, time_in_the_past: Timestamp, min_num_files: usize, num_partitions: usize) -> Result<Vec<PartitionParam>>;
|
"recent_highest_throughput_partitions" = recent_highest_throughput_partitions(&mut self, shard_id: Option<ShardId>, time_in_the_past: Timestamp, min_num_files: usize, num_partitions: usize) -> Result<Vec<PartitionParam>>;
|
||||||
"parquet_partitions_with_small_l1_file_count" = partitions_with_small_l1_file_count(&mut self, shard_id: Option<ShardId>, small_size_threshold_bytes: i64, min_small_file_count: usize, num_partitions: usize) -> Result<Vec<PartitionParam>>;
|
"parquet_partitions_with_small_l1_file_count" = partitions_with_small_l1_file_count(&mut self, shard_id: Option<ShardId>, small_size_threshold_bytes: i64, min_small_file_count: usize, num_partitions: usize) -> Result<Vec<PartitionParam>>;
|
||||||
"most_cold_files_partitions" = most_cold_files_partitions(&mut self, shard_id: Option<ShardId>, time_in_the_past: Timestamp, num_partitions: usize) -> Result<Vec<PartitionParam>>;
|
"most_cold_files_partitions" = most_cold_files_partitions(&mut self, shard_id: Option<ShardId>, time_in_the_past: Timestamp, num_partitions: usize) -> Result<Vec<PartitionParam>>;
|
||||||
|
|
|
@ -1876,6 +1876,23 @@ WHERE parquet_file.shard_id = $1
|
||||||
.map_err(|e| Error::SqlxError { source: e })
|
.map_err(|e| Error::SqlxError { source: e })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn partitions_with_recent_created_files(
|
||||||
|
&mut self,
|
||||||
|
time_in_the_past: Timestamp,
|
||||||
|
) -> Result<Vec<PartitionId>> {
|
||||||
|
sqlx::query_as::<_, PartitionId>(
|
||||||
|
r#"
|
||||||
|
SELECT distinct partition_id
|
||||||
|
FROM parquet_file
|
||||||
|
WHERE created_at > $1;
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.bind(time_in_the_past) // $1
|
||||||
|
.fetch_all(&mut self.inner)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::SqlxError { source: e })
|
||||||
|
}
|
||||||
|
|
||||||
async fn recent_highest_throughput_partitions(
|
async fn recent_highest_throughput_partitions(
|
||||||
&mut self,
|
&mut self,
|
||||||
shard_id: Option<ShardId>,
|
shard_id: Option<ShardId>,
|
||||||
|
|
|
@ -1474,22 +1474,21 @@ fn columns_in_predicates(
|
||||||
table_name: &str,
|
table_name: &str,
|
||||||
predicate: &Predicate,
|
predicate: &Predicate,
|
||||||
) -> Option<Vec<usize>> {
|
) -> Option<Vec<usize>> {
|
||||||
let mut columns = StdHashSet::new();
|
|
||||||
|
|
||||||
// columns in field_columns
|
// columns in field_columns
|
||||||
match &predicate.field_columns {
|
let mut columns = match &predicate.field_columns {
|
||||||
Some(field_columns) => {
|
Some(field_columns) => field_columns
|
||||||
for field in field_columns {
|
.iter()
|
||||||
columns.insert(Column::from_name(field));
|
.map(Column::from_name)
|
||||||
}
|
.collect::<StdHashSet<_>>(),
|
||||||
}
|
|
||||||
None => {
|
None => {
|
||||||
if need_fields {
|
if need_fields {
|
||||||
// fields wanted and `field_columns` is empty mean al fields will be needed
|
// fields wanted and `field_columns` is empty mean al fields will be needed
|
||||||
return None;
|
return None;
|
||||||
|
} else {
|
||||||
|
StdHashSet::new()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
|
||||||
// columns in exprs
|
// columns in exprs
|
||||||
let expr_cols_result =
|
let expr_cols_result =
|
||||||
|
@ -1506,35 +1505,35 @@ fn columns_in_predicates(
|
||||||
let projection = if expr_cols_result.is_err() || val_exprs_cols_result.is_err() {
|
let projection = if expr_cols_result.is_err() || val_exprs_cols_result.is_err() {
|
||||||
if expr_cols_result.is_err() {
|
if expr_cols_result.is_err() {
|
||||||
let error_message = expr_cols_result.err().unwrap().to_string();
|
let error_message = expr_cols_result.err().unwrap().to_string();
|
||||||
warn!(?table_name, ?predicate.exprs, ?error_message, "cannot determine columns in predicate.exprs");
|
warn!(table_name, ?predicate.exprs, ?error_message, "cannot determine columns in predicate.exprs");
|
||||||
}
|
}
|
||||||
if val_exprs_cols_result.is_err() {
|
if val_exprs_cols_result.is_err() {
|
||||||
let error_message = val_exprs_cols_result.err().unwrap().to_string();
|
let error_message = val_exprs_cols_result.err().unwrap().to_string();
|
||||||
warn!(?table_name, ?predicate.value_expr, ?error_message, "cannot determine columns in predicate.value_expr");
|
warn!(table_name, ?predicate.value_expr, ?error_message, "cannot determine columns in predicate.value_expr");
|
||||||
}
|
}
|
||||||
|
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
// convert the column names into their corresponding indexes in the schema
|
// convert the column names into their corresponding indexes in the schema
|
||||||
let cols = columns
|
if columns.is_empty() {
|
||||||
.iter()
|
return None;
|
||||||
.map(|c| table_schema.find_index_of(&c.name))
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
if cols.contains(&None) || cols.is_empty() {
|
|
||||||
// At least one column has no matching index, we do not know which
|
|
||||||
// columns to filter. Read all columns
|
|
||||||
warn!(
|
|
||||||
?table_name,
|
|
||||||
?predicate,
|
|
||||||
?table_schema,
|
|
||||||
"cannot find index for at least one column in the table schema"
|
|
||||||
);
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
// We know which columns to filter, read only those columns
|
|
||||||
Some(cols.into_iter().flatten().collect::<Vec<_>>())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut indices = Vec::with_capacity(columns.len());
|
||||||
|
for c in columns {
|
||||||
|
if let Some(idx) = table_schema.find_index_of(&c.name) {
|
||||||
|
indices.push(idx);
|
||||||
|
} else {
|
||||||
|
warn!(
|
||||||
|
table_name,
|
||||||
|
column=c.name.as_str(),
|
||||||
|
table_columns=?table_schema.iter().map(|(_t, f)| f.name()).collect::<Vec<_>>(),
|
||||||
|
"cannot find predicate column (field column, value expr, filter expression) table schema",
|
||||||
|
);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(indices)
|
||||||
};
|
};
|
||||||
|
|
||||||
projection
|
projection
|
||||||
|
|
|
@ -18,9 +18,8 @@ use object_store::DynObjectStore;
|
||||||
use observability_deps::tracing::info;
|
use observability_deps::tracing::info;
|
||||||
use router::{
|
use router::{
|
||||||
dml_handlers::{
|
dml_handlers::{
|
||||||
write_service_client, DmlHandler, DmlHandlerChainExt, FanOutAdaptor,
|
DmlHandler, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, Partitioner,
|
||||||
InstrumentationDecorator, Partitioner, RetentionValidator, RpcWrite, SchemaValidator,
|
RetentionValidator, RpcWrite, SchemaValidator, ShardedWriteBuffer, WriteSummaryAdapter,
|
||||||
ShardedWriteBuffer, WriteSummaryAdapter,
|
|
||||||
},
|
},
|
||||||
namespace_cache::{
|
namespace_cache::{
|
||||||
metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache, ShardedCache,
|
metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache, ShardedCache,
|
||||||
|
@ -259,14 +258,13 @@ pub async fn create_router2_server_type(
|
||||||
// Hack to handle multiple ingester addresses separated by commas in potentially many uses of
|
// Hack to handle multiple ingester addresses separated by commas in potentially many uses of
|
||||||
// the CLI arg
|
// the CLI arg
|
||||||
let ingester_addresses = router_config.ingester_addresses.join(",");
|
let ingester_addresses = router_config.ingester_addresses.join(",");
|
||||||
let ingester_addresses_list: Vec<_> = ingester_addresses.split(',').collect();
|
|
||||||
let mut ingester_clients = Vec::with_capacity(ingester_addresses_list.len());
|
let grpc_connections = router::dml_handlers::build_ingester_connection(
|
||||||
for ingester_addr in ingester_addresses_list {
|
ingester_addresses.split(',').map(|s| format!("http://{s}")),
|
||||||
ingester_clients.push(write_service_client(ingester_addr).await);
|
);
|
||||||
}
|
|
||||||
|
|
||||||
// Initialise the DML handler that sends writes to the ingester using the RPC write path.
|
// Initialise the DML handler that sends writes to the ingester using the RPC write path.
|
||||||
let rpc_writer = RpcWrite::new(RoundRobin::new(ingester_clients));
|
let rpc_writer = RpcWrite::new(RoundRobin::new([grpc_connections]));
|
||||||
let rpc_writer = InstrumentationDecorator::new("rpc_writer", &metrics, rpc_writer);
|
let rpc_writer = InstrumentationDecorator::new("rpc_writer", &metrics, rpc_writer);
|
||||||
// 1. END
|
// 1. END
|
||||||
|
|
||||||
|
|
|
@ -12,19 +12,22 @@ use mutable_batch::MutableBatch;
|
||||||
use mutable_batch_pb::encode::encode_write;
|
use mutable_batch_pb::encode::encode_write;
|
||||||
use observability_deps::tracing::*;
|
use observability_deps::tracing::*;
|
||||||
use sharder::RoundRobin;
|
use sharder::RoundRobin;
|
||||||
use std::{fmt::Debug, time::Duration};
|
use std::{fmt::Debug, str::FromStr, time::Duration};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
use tonic::transport::{Channel, Endpoint};
|
||||||
use trace::ctx::SpanContext;
|
use trace::ctx::SpanContext;
|
||||||
|
|
||||||
/// Create a client to the ingester's write service.
|
/// Create a connection to one or more ingesters, load-balancing requests across
|
||||||
pub async fn write_service_client(
|
/// all of them.
|
||||||
ingester_addr: &str,
|
///
|
||||||
) -> WriteServiceClient<client_util::connection::GrpcConnection> {
|
/// Connections are lazily established.
|
||||||
let connection = client_util::connection::Builder::default()
|
pub fn build_ingester_connection<T>(addrs: impl Iterator<Item = T>) -> WriteServiceClient<Channel>
|
||||||
.build(format!("http://{}", ingester_addr))
|
where
|
||||||
.await
|
T: AsRef<str>,
|
||||||
.unwrap_or_else(|e| panic!("failed to connect to server {ingester_addr}: {e}"));
|
{
|
||||||
WriteServiceClient::new(connection.into_grpc_connection())
|
WriteServiceClient::new(Channel::balance_list(
|
||||||
|
addrs.map(|s| Endpoint::from_str(s.as_ref()).expect("invalid ingester address")),
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The bound on RPC request duration.
|
/// The bound on RPC request duration.
|
||||||
|
@ -127,6 +130,7 @@ where
|
||||||
Ok(()) => break,
|
Ok(()) => break,
|
||||||
Err(e) => warn!(error=%e, "failed ingester rpc write"),
|
Err(e) => warn!(error=%e, "failed ingester rpc write"),
|
||||||
};
|
};
|
||||||
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
|
|
@ -12,7 +12,8 @@ pub(super) trait WriteClient: Send + Sync + std::fmt::Debug {
|
||||||
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError>;
|
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An implementation of [`WriteClient`] for the tonic gRPC client.
|
/// An implementation of [`WriteClient`] for the bespoke IOx wrapper over the
|
||||||
|
/// tonic gRPC client.
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl WriteClient for WriteServiceClient<client_util::connection::GrpcConnection> {
|
impl WriteClient for WriteServiceClient<client_util::connection::GrpcConnection> {
|
||||||
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError> {
|
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError> {
|
||||||
|
@ -21,6 +22,15 @@ impl WriteClient for WriteServiceClient<client_util::connection::GrpcConnection>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// An implementation of [`WriteClient`] for the tonic gRPC client.
|
||||||
|
#[async_trait]
|
||||||
|
impl WriteClient for WriteServiceClient<tonic::transport::Channel> {
|
||||||
|
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError> {
|
||||||
|
WriteServiceClient::write(&mut self.clone(), op).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub(crate) mod mock {
|
pub(crate) mod mock {
|
||||||
use std::{collections::VecDeque, sync::Arc};
|
use std::{collections::VecDeque, sync::Arc};
|
||||||
|
|
Loading…
Reference in New Issue