Merge branch 'main' into dom/split-data
commit
f84ca2a44f
|
@ -22,7 +22,7 @@ pub async fn compact(compactor: Arc<Compactor>) -> usize {
|
||||||
debug!(compaction_type, "start collecting partitions to compact");
|
debug!(compaction_type, "start collecting partitions to compact");
|
||||||
let attributes = Attributes::from(&[("partition_type", compaction_type)]);
|
let attributes = Attributes::from(&[("partition_type", compaction_type)]);
|
||||||
let start_time = compactor.time_provider.now();
|
let start_time = compactor.time_provider.now();
|
||||||
let candidates = Backoff::new(&compactor.backoff_config)
|
let (candidates, table_columns) = Backoff::new(&compactor.backoff_config)
|
||||||
.retry_all_errors("cold_partitions_to_compact", || async {
|
.retry_all_errors("cold_partitions_to_compact", || async {
|
||||||
compactor
|
compactor
|
||||||
.cold_partitions_to_compact(compactor.config.max_number_partitions_per_shard)
|
.cold_partitions_to_compact(compactor.config.max_number_partitions_per_shard)
|
||||||
|
@ -41,46 +41,6 @@ pub async fn compact(compactor: Arc<Compactor>) -> usize {
|
||||||
duration.record(delta);
|
duration.record(delta);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get extra needed information for selected partitions
|
|
||||||
let start_time = compactor.time_provider.now();
|
|
||||||
|
|
||||||
// Column types and their counts of the tables of the partition candidates
|
|
||||||
debug!(
|
|
||||||
num_candidates=?candidates.len(),
|
|
||||||
compaction_type,
|
|
||||||
"start getting column types for the partition candidates"
|
|
||||||
);
|
|
||||||
let table_columns = Backoff::new(&compactor.backoff_config)
|
|
||||||
.retry_all_errors("table_columns", || async {
|
|
||||||
compactor.table_columns(&candidates).await
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.expect("retry forever");
|
|
||||||
|
|
||||||
// Add other compaction-needed info into selected partitions
|
|
||||||
debug!(
|
|
||||||
num_candidates=?candidates.len(),
|
|
||||||
compaction_type,
|
|
||||||
"start getting additional info for the partition candidates"
|
|
||||||
);
|
|
||||||
let candidates = Backoff::new(&compactor.backoff_config)
|
|
||||||
.retry_all_errors("add_info_to_partitions", || async {
|
|
||||||
compactor.add_info_to_partitions(&candidates).await
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.expect("retry forever");
|
|
||||||
|
|
||||||
if let Some(delta) = compactor
|
|
||||||
.time_provider
|
|
||||||
.now()
|
|
||||||
.checked_duration_since(start_time)
|
|
||||||
{
|
|
||||||
let duration = compactor
|
|
||||||
.partitions_extra_info_reading_duration
|
|
||||||
.recorder(attributes.clone());
|
|
||||||
duration.record(delta);
|
|
||||||
}
|
|
||||||
|
|
||||||
let n_candidates = candidates.len();
|
let n_candidates = candidates.len();
|
||||||
if n_candidates == 0 {
|
if n_candidates == 0 {
|
||||||
debug!(compaction_type, "no compaction candidates found");
|
debug!(compaction_type, "no compaction candidates found");
|
||||||
|
@ -96,7 +56,7 @@ pub async fn compact(compactor: Arc<Compactor>) -> usize {
|
||||||
Arc::clone(&compactor),
|
Arc::clone(&compactor),
|
||||||
compaction_type,
|
compaction_type,
|
||||||
compact_in_parallel,
|
compact_in_parallel,
|
||||||
candidates.clone(),
|
candidates.into(),
|
||||||
table_columns,
|
table_columns,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
@ -436,14 +396,13 @@ mod tests {
|
||||||
|
|
||||||
// ------------------------------------------------
|
// ------------------------------------------------
|
||||||
// Compact
|
// Compact
|
||||||
let candidates = compactor
|
let (mut candidates, _table_columns) = compactor
|
||||||
.cold_partitions_to_compact(compactor.config.max_number_partitions_per_shard)
|
.cold_partitions_to_compact(compactor.config.max_number_partitions_per_shard)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let mut candidates = compactor.add_info_to_partitions(&candidates).await.unwrap();
|
|
||||||
|
|
||||||
assert_eq!(candidates.len(), 1);
|
assert_eq!(candidates.len(), 1);
|
||||||
let c = candidates.pop_front().unwrap();
|
let c = candidates.pop().unwrap();
|
||||||
|
|
||||||
let parquet_files_for_compaction =
|
let parquet_files_for_compaction =
|
||||||
parquet_file_lookup::ParquetFilesForCompaction::for_partition_with_size_overrides(
|
parquet_file_lookup::ParquetFilesForCompaction::for_partition_with_size_overrides(
|
||||||
|
@ -633,14 +592,13 @@ mod tests {
|
||||||
|
|
||||||
// ------------------------------------------------
|
// ------------------------------------------------
|
||||||
// Compact
|
// Compact
|
||||||
let candidates = compactor
|
let (mut candidates, _table_columns) = compactor
|
||||||
.cold_partitions_to_compact(compactor.config.max_number_partitions_per_shard)
|
.cold_partitions_to_compact(compactor.config.max_number_partitions_per_shard)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let mut candidates = compactor.add_info_to_partitions(&candidates).await.unwrap();
|
|
||||||
|
|
||||||
assert_eq!(candidates.len(), 1);
|
assert_eq!(candidates.len(), 1);
|
||||||
let c = candidates.pop_front().unwrap();
|
let c = candidates.pop().unwrap();
|
||||||
|
|
||||||
let parquet_files_for_compaction =
|
let parquet_files_for_compaction =
|
||||||
parquet_file_lookup::ParquetFilesForCompaction::for_partition_with_size_overrides(
|
parquet_file_lookup::ParquetFilesForCompaction::for_partition_with_size_overrides(
|
||||||
|
|
|
@ -18,7 +18,7 @@ use parquet_file::storage::ParquetStorage;
|
||||||
use schema::sort::SortKey;
|
use schema::sort::SortKey;
|
||||||
use snafu::{OptionExt, ResultExt, Snafu};
|
use snafu::{OptionExt, ResultExt, Snafu};
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, HashSet, VecDeque},
|
collections::{HashMap, HashSet},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
@ -276,15 +276,19 @@ impl Compactor {
|
||||||
// Minimum number of the most recent writes per partition we want to count
|
// Minimum number of the most recent writes per partition we want to count
|
||||||
// to prioritize partitions
|
// to prioritize partitions
|
||||||
min_recent_ingested_files: usize,
|
min_recent_ingested_files: usize,
|
||||||
) -> Result<Vec<PartitionParam>> {
|
) -> Result<(
|
||||||
|
Vec<Arc<PartitionCompactionCandidateWithInfo>>,
|
||||||
|
HashMap<TableId, Vec<ColumnTypeCount>>,
|
||||||
|
)> {
|
||||||
|
let compaction_type = "hot";
|
||||||
let mut candidates = Vec::with_capacity(self.shards.len() * max_num_partitions_per_shard);
|
let mut candidates = Vec::with_capacity(self.shards.len() * max_num_partitions_per_shard);
|
||||||
let mut repos = self.catalog.repositories().await;
|
|
||||||
|
|
||||||
for shard_id in &self.shards {
|
for shard_id in &self.shards {
|
||||||
let attributes = Attributes::from([
|
let attributes = Attributes::from([
|
||||||
("shard_id", format!("{}", *shard_id).into()),
|
("shard_id", format!("{}", *shard_id).into()),
|
||||||
("partition_type", "hot".into()),
|
("partition_type", compaction_type.into()),
|
||||||
]);
|
]);
|
||||||
|
let mut repos = self.catalog.repositories().await;
|
||||||
|
|
||||||
// Get the most recent highest ingested throughput partitions within
|
// Get the most recent highest ingested throughput partitions within
|
||||||
// the last 4 hours. If not, increase to 24 hours
|
// the last 4 hours. If not, increase to 24 hours
|
||||||
|
@ -326,13 +330,41 @@ impl Compactor {
|
||||||
debug!(
|
debug!(
|
||||||
shard_id = shard_id.get(),
|
shard_id = shard_id.get(),
|
||||||
n = num_partitions,
|
n = num_partitions,
|
||||||
"hot compaction candidates",
|
compaction_type,
|
||||||
|
"compaction candidates",
|
||||||
);
|
);
|
||||||
let number_gauge = self.compaction_candidate_gauge.recorder(attributes);
|
let number_gauge = self.compaction_candidate_gauge.recorder(attributes);
|
||||||
number_gauge.set(num_partitions as u64);
|
number_gauge.set(num_partitions as u64);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(candidates)
|
// Get extra needed information for selected partitions
|
||||||
|
let start_time = self.time_provider.now();
|
||||||
|
|
||||||
|
// Column types and their counts of the tables of the partition candidates
|
||||||
|
debug!(
|
||||||
|
num_candidates=?candidates.len(),
|
||||||
|
compaction_type,
|
||||||
|
"start getting column types for the partition candidates"
|
||||||
|
);
|
||||||
|
let table_columns = self.table_columns(&candidates).await?;
|
||||||
|
|
||||||
|
// Add other compaction-needed info into selected partitions
|
||||||
|
debug!(
|
||||||
|
num_candidates=?candidates.len(),
|
||||||
|
compaction_type,
|
||||||
|
"start getting additional info for the partition candidates"
|
||||||
|
);
|
||||||
|
let candidates = self.add_info_to_partitions(&candidates).await?;
|
||||||
|
|
||||||
|
if let Some(delta) = self.time_provider.now().checked_duration_since(start_time) {
|
||||||
|
let attributes = Attributes::from(&[("partition_type", compaction_type)]);
|
||||||
|
let duration = self
|
||||||
|
.partitions_extra_info_reading_duration
|
||||||
|
.recorder(attributes);
|
||||||
|
duration.record(delta);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok((candidates, table_columns))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return a list of partitions that:
|
/// Return a list of partitions that:
|
||||||
|
@ -345,20 +377,24 @@ impl Compactor {
|
||||||
&self,
|
&self,
|
||||||
// Max number of cold partitions per shard we want to compact
|
// Max number of cold partitions per shard we want to compact
|
||||||
max_num_partitions_per_shard: usize,
|
max_num_partitions_per_shard: usize,
|
||||||
) -> Result<Vec<PartitionParam>> {
|
) -> Result<(
|
||||||
|
Vec<Arc<PartitionCompactionCandidateWithInfo>>,
|
||||||
|
HashMap<TableId, Vec<ColumnTypeCount>>,
|
||||||
|
)> {
|
||||||
|
let compaction_type = "cold";
|
||||||
let mut candidates = Vec::with_capacity(self.shards.len() * max_num_partitions_per_shard);
|
let mut candidates = Vec::with_capacity(self.shards.len() * max_num_partitions_per_shard);
|
||||||
let mut repos = self.catalog.repositories().await;
|
|
||||||
|
|
||||||
for shard_id in &self.shards {
|
for shard_id in &self.shards {
|
||||||
let attributes = Attributes::from([
|
let attributes = Attributes::from([
|
||||||
("shard_id", format!("{}", *shard_id).into()),
|
("shard_id", format!("{}", *shard_id).into()),
|
||||||
("partition_type", "cold".into()),
|
("partition_type", compaction_type.into()),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
let time_8_hours_ago = Timestamp::new(
|
let time_8_hours_ago = Timestamp::new(
|
||||||
(self.time_provider.now() - Duration::from_secs(60 * 60 * 8)).timestamp_nanos(),
|
(self.time_provider.now() - Duration::from_secs(60 * 60 * 8)).timestamp_nanos(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let mut repos = self.catalog.repositories().await;
|
||||||
let mut partitions = repos
|
let mut partitions = repos
|
||||||
.parquet_files()
|
.parquet_files()
|
||||||
.most_cold_files_partitions(
|
.most_cold_files_partitions(
|
||||||
|
@ -378,17 +414,45 @@ impl Compactor {
|
||||||
debug!(
|
debug!(
|
||||||
shard_id = shard_id.get(),
|
shard_id = shard_id.get(),
|
||||||
n = num_partitions,
|
n = num_partitions,
|
||||||
"cold compaction candidates",
|
compaction_type,
|
||||||
|
"compaction candidates",
|
||||||
);
|
);
|
||||||
let number_gauge = self.compaction_candidate_gauge.recorder(attributes);
|
let number_gauge = self.compaction_candidate_gauge.recorder(attributes);
|
||||||
number_gauge.set(num_partitions as u64);
|
number_gauge.set(num_partitions as u64);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(candidates)
|
// Get extra needed information for selected partitions
|
||||||
|
let start_time = self.time_provider.now();
|
||||||
|
|
||||||
|
// Column types and their counts of the tables of the partition candidates
|
||||||
|
debug!(
|
||||||
|
num_candidates=?candidates.len(),
|
||||||
|
compaction_type,
|
||||||
|
"start getting column types for the partition candidates"
|
||||||
|
);
|
||||||
|
let table_columns = self.table_columns(&candidates).await?;
|
||||||
|
|
||||||
|
// Add other compaction-needed info into selected partitions
|
||||||
|
debug!(
|
||||||
|
num_candidates=?candidates.len(),
|
||||||
|
compaction_type,
|
||||||
|
"start getting additional info for the partition candidates"
|
||||||
|
);
|
||||||
|
let candidates = self.add_info_to_partitions(&candidates).await?;
|
||||||
|
|
||||||
|
if let Some(delta) = self.time_provider.now().checked_duration_since(start_time) {
|
||||||
|
let attributes = Attributes::from(&[("partition_type", compaction_type)]);
|
||||||
|
let duration = self
|
||||||
|
.partitions_extra_info_reading_duration
|
||||||
|
.recorder(attributes);
|
||||||
|
duration.record(delta);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok((candidates, table_columns))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get column types for tables of given partitions
|
/// Get column types for tables of given partitions
|
||||||
pub async fn table_columns(
|
async fn table_columns(
|
||||||
&self,
|
&self,
|
||||||
partitions: &[PartitionParam],
|
partitions: &[PartitionParam],
|
||||||
) -> Result<HashMap<TableId, Vec<ColumnTypeCount>>> {
|
) -> Result<HashMap<TableId, Vec<ColumnTypeCount>>> {
|
||||||
|
@ -409,10 +473,10 @@ impl Compactor {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add namespace and table information to partition candidates.
|
/// Add namespace and table information to partition candidates.
|
||||||
pub async fn add_info_to_partitions(
|
async fn add_info_to_partitions(
|
||||||
&self,
|
&self,
|
||||||
partitions: &[PartitionParam],
|
partitions: &[PartitionParam],
|
||||||
) -> Result<VecDeque<Arc<PartitionCompactionCandidateWithInfo>>> {
|
) -> Result<Vec<Arc<PartitionCompactionCandidateWithInfo>>> {
|
||||||
let mut repos = self.catalog.repositories().await;
|
let mut repos = self.catalog.repositories().await;
|
||||||
|
|
||||||
let table_ids: HashSet<_> = partitions.iter().map(|p| p.table_id).collect();
|
let table_ids: HashSet<_> = partitions.iter().map(|p| p.table_id).collect();
|
||||||
|
@ -481,7 +545,7 @@ impl Compactor {
|
||||||
partition_key: part.partition_key.clone(),
|
partition_key: part.partition_key.clone(),
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.collect::<VecDeque<_>>())
|
.collect())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -672,7 +736,7 @@ mod tests {
|
||||||
// --------------------------------------
|
// --------------------------------------
|
||||||
// Case 1: no files yet --> no partition candidates
|
// Case 1: no files yet --> no partition candidates
|
||||||
//
|
//
|
||||||
let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
|
let (candidates, _table_columns) = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
|
||||||
assert!(candidates.is_empty());
|
assert!(candidates.is_empty());
|
||||||
|
|
||||||
// --------------------------------------
|
// --------------------------------------
|
||||||
|
@ -696,7 +760,7 @@ mod tests {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
txn.commit().await.unwrap();
|
txn.commit().await.unwrap();
|
||||||
// No non-deleted level 0 files yet --> no candidates
|
// No non-deleted level 0 files yet --> no candidates
|
||||||
let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
|
let (candidates, _table_columns) = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
|
||||||
assert!(candidates.is_empty());
|
assert!(candidates.is_empty());
|
||||||
|
|
||||||
// --------------------------------------
|
// --------------------------------------
|
||||||
|
@ -715,7 +779,7 @@ mod tests {
|
||||||
txn.commit().await.unwrap();
|
txn.commit().await.unwrap();
|
||||||
|
|
||||||
// No hot candidates
|
// No hot candidates
|
||||||
let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
|
let (candidates, _table_columns) = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
|
||||||
assert!(candidates.is_empty());
|
assert!(candidates.is_empty());
|
||||||
|
|
||||||
// --------------------------------------
|
// --------------------------------------
|
||||||
|
@ -733,9 +797,9 @@ mod tests {
|
||||||
txn.commit().await.unwrap();
|
txn.commit().await.unwrap();
|
||||||
//
|
//
|
||||||
// Has at least one partition with a recent write --> make it a candidate
|
// Has at least one partition with a recent write --> make it a candidate
|
||||||
let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
|
let (candidates, _table_columns) = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
|
||||||
assert_eq!(candidates.len(), 1);
|
assert_eq!(candidates.len(), 1);
|
||||||
assert_eq!(candidates[0].partition_id, partition4.id);
|
assert_eq!(candidates[0].id(), partition4.id);
|
||||||
|
|
||||||
// --------------------------------------
|
// --------------------------------------
|
||||||
// Case 5: has 2 partitions with 2 different groups of recent writes:
|
// Case 5: has 2 partitions with 2 different groups of recent writes:
|
||||||
|
@ -755,9 +819,9 @@ mod tests {
|
||||||
txn.commit().await.unwrap();
|
txn.commit().await.unwrap();
|
||||||
//
|
//
|
||||||
// make partitions in the most recent group candidates
|
// make partitions in the most recent group candidates
|
||||||
let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
|
let (candidates, _table_columns) = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
|
||||||
assert_eq!(candidates.len(), 1);
|
assert_eq!(candidates.len(), 1);
|
||||||
assert_eq!(candidates[0].partition_id, partition3.id);
|
assert_eq!(candidates[0].id(), partition3.id);
|
||||||
|
|
||||||
// --------------------------------------
|
// --------------------------------------
|
||||||
// Case 6: has partition candidates for 2 shards
|
// Case 6: has partition candidates for 2 shards
|
||||||
|
@ -776,36 +840,24 @@ mod tests {
|
||||||
txn.commit().await.unwrap();
|
txn.commit().await.unwrap();
|
||||||
//
|
//
|
||||||
// Will have 2 candidates, one for each shard
|
// Will have 2 candidates, one for each shard
|
||||||
let mut candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
|
let (mut candidates, _table_columns) =
|
||||||
candidates.sort();
|
compactor.hot_partitions_to_compact(1, 1).await.unwrap();
|
||||||
|
candidates.sort_by_key(|c| c.candidate);
|
||||||
assert_eq!(candidates.len(), 2);
|
assert_eq!(candidates.len(), 2);
|
||||||
assert_eq!(candidates[0].partition_id, partition3.id);
|
|
||||||
assert_eq!(candidates[0].shard_id, shard.id);
|
|
||||||
assert_eq!(candidates[1].partition_id, another_partition.id);
|
|
||||||
assert_eq!(candidates[1].shard_id, another_shard.id);
|
|
||||||
|
|
||||||
// Add info to partition
|
assert_eq!(candidates[0].id(), partition3.id);
|
||||||
let partitions_with_info = compactor.add_info_to_partitions(&candidates).await.unwrap();
|
assert_eq!(candidates[0].shard_id(), shard.id);
|
||||||
assert_eq!(partitions_with_info.len(), 2);
|
assert_eq!(*candidates[0].namespace, namespace);
|
||||||
//
|
assert_eq!(*candidates[0].table, table);
|
||||||
assert_eq!(*partitions_with_info[0].namespace, namespace);
|
assert_eq!(candidates[0].partition_key, partition3.partition_key);
|
||||||
assert_eq!(*partitions_with_info[0].table, table);
|
assert_eq!(candidates[0].sort_key, partition3.sort_key()); // this sort key is None
|
||||||
assert_eq!(
|
|
||||||
partitions_with_info[0].partition_key,
|
assert_eq!(candidates[1].id(), another_partition.id);
|
||||||
partition3.partition_key
|
assert_eq!(candidates[1].shard_id(), another_shard.id);
|
||||||
);
|
assert_eq!(*candidates[1].namespace, namespace);
|
||||||
assert_eq!(partitions_with_info[0].sort_key, partition3.sort_key()); // this sort key is None
|
assert_eq!(*candidates[1].table, another_table);
|
||||||
//
|
assert_eq!(candidates[1].partition_key, another_partition.partition_key);
|
||||||
assert_eq!(*partitions_with_info[1].namespace, namespace);
|
assert_eq!(candidates[1].sort_key, another_partition.sort_key()); // this sort key is Some(tag1, time)
|
||||||
assert_eq!(*partitions_with_info[1].table, another_table);
|
|
||||||
assert_eq!(
|
|
||||||
partitions_with_info[1].partition_key,
|
|
||||||
another_partition.partition_key
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
partitions_with_info[1].sort_key,
|
|
||||||
another_partition.sort_key()
|
|
||||||
); // this sort key is Some(tag1, time)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn make_compactor_config() -> CompactorConfig {
|
fn make_compactor_config() -> CompactorConfig {
|
||||||
|
@ -952,7 +1004,7 @@ mod tests {
|
||||||
// --------------------------------------
|
// --------------------------------------
|
||||||
// Case 1: no files yet --> no partition candidates
|
// Case 1: no files yet --> no partition candidates
|
||||||
//
|
//
|
||||||
let candidates = compactor.cold_partitions_to_compact(1).await.unwrap();
|
let (candidates, _table_columns) = compactor.cold_partitions_to_compact(1).await.unwrap();
|
||||||
assert!(candidates.is_empty());
|
assert!(candidates.is_empty());
|
||||||
|
|
||||||
// --------------------------------------
|
// --------------------------------------
|
||||||
|
@ -973,7 +1025,7 @@ mod tests {
|
||||||
let _pf2 = txn.parquet_files().create(p2).await.unwrap();
|
let _pf2 = txn.parquet_files().create(p2).await.unwrap();
|
||||||
txn.commit().await.unwrap();
|
txn.commit().await.unwrap();
|
||||||
// No non-deleted level 0 files yet --> no candidates
|
// No non-deleted level 0 files yet --> no candidates
|
||||||
let candidates = compactor.cold_partitions_to_compact(1).await.unwrap();
|
let (candidates, _table_columns) = compactor.cold_partitions_to_compact(1).await.unwrap();
|
||||||
assert!(candidates.is_empty());
|
assert!(candidates.is_empty());
|
||||||
|
|
||||||
// --------------------------------------
|
// --------------------------------------
|
||||||
|
@ -990,9 +1042,9 @@ mod tests {
|
||||||
txn.commit().await.unwrap();
|
txn.commit().await.unwrap();
|
||||||
//
|
//
|
||||||
// Has at least one partition with a L0 file --> make it a candidate
|
// Has at least one partition with a L0 file --> make it a candidate
|
||||||
let candidates = compactor.cold_partitions_to_compact(1).await.unwrap();
|
let (candidates, _table_columns) = compactor.cold_partitions_to_compact(1).await.unwrap();
|
||||||
assert_eq!(candidates.len(), 1);
|
assert_eq!(candidates.len(), 1);
|
||||||
assert_eq!(candidates[0].partition_id, partition2.id);
|
assert_eq!(candidates[0].id(), partition2.id);
|
||||||
|
|
||||||
// --------------------------------------
|
// --------------------------------------
|
||||||
// Case 4: has two cold partitions --> return the candidate with the most L0
|
// Case 4: has two cold partitions --> return the candidate with the most L0
|
||||||
|
@ -1013,9 +1065,9 @@ mod tests {
|
||||||
let _pf5 = txn.parquet_files().create(p5).await.unwrap();
|
let _pf5 = txn.parquet_files().create(p5).await.unwrap();
|
||||||
txn.commit().await.unwrap();
|
txn.commit().await.unwrap();
|
||||||
// Partition with the most l0 files is the candidate
|
// Partition with the most l0 files is the candidate
|
||||||
let candidates = compactor.cold_partitions_to_compact(1).await.unwrap();
|
let (candidates, _table_columns) = compactor.cold_partitions_to_compact(1).await.unwrap();
|
||||||
assert_eq!(candidates.len(), 1);
|
assert_eq!(candidates.len(), 1);
|
||||||
assert_eq!(candidates[0].partition_id, partition4.id);
|
assert_eq!(candidates[0].id(), partition4.id);
|
||||||
|
|
||||||
// --------------------------------------
|
// --------------------------------------
|
||||||
// Case 5: "warm" and "hot" partitions aren't returned
|
// Case 5: "warm" and "hot" partitions aren't returned
|
||||||
|
@ -1045,21 +1097,21 @@ mod tests {
|
||||||
let _pf5_hot = txn.parquet_files().create(p5_hot).await.unwrap();
|
let _pf5_hot = txn.parquet_files().create(p5_hot).await.unwrap();
|
||||||
txn.commit().await.unwrap();
|
txn.commit().await.unwrap();
|
||||||
// Partition4 is still the only candidate
|
// Partition4 is still the only candidate
|
||||||
let candidates = compactor.cold_partitions_to_compact(1).await.unwrap();
|
let (candidates, _table_columns) = compactor.cold_partitions_to_compact(1).await.unwrap();
|
||||||
assert_eq!(candidates.len(), 1);
|
assert_eq!(candidates.len(), 1);
|
||||||
assert_eq!(candidates[0].partition_id, partition4.id);
|
assert_eq!(candidates[0].id(), partition4.id);
|
||||||
|
|
||||||
// Ask for 2 partitions per shard; get partition4 and partition2
|
// Ask for 2 partitions per shard; get partition4 and partition2
|
||||||
let candidates = compactor.cold_partitions_to_compact(2).await.unwrap();
|
let (candidates, _table_columns) = compactor.cold_partitions_to_compact(2).await.unwrap();
|
||||||
assert_eq!(candidates.len(), 2);
|
assert_eq!(candidates.len(), 2);
|
||||||
assert_eq!(candidates[0].partition_id, partition4.id);
|
assert_eq!(candidates[0].id(), partition4.id);
|
||||||
assert_eq!(candidates[1].partition_id, partition2.id);
|
assert_eq!(candidates[1].id(), partition2.id);
|
||||||
|
|
||||||
// Ask for 3 partitions per shard; still get only partition4 and partition2
|
// Ask for 3 partitions per shard; still get only partition4 and partition2
|
||||||
let candidates = compactor.cold_partitions_to_compact(3).await.unwrap();
|
let (candidates, _table_columns) = compactor.cold_partitions_to_compact(3).await.unwrap();
|
||||||
assert_eq!(candidates.len(), 2);
|
assert_eq!(candidates.len(), 2);
|
||||||
assert_eq!(candidates[0].partition_id, partition4.id);
|
assert_eq!(candidates[0].id(), partition4.id);
|
||||||
assert_eq!(candidates[1].partition_id, partition2.id);
|
assert_eq!(candidates[1].id(), partition2.id);
|
||||||
|
|
||||||
// --------------------------------------
|
// --------------------------------------
|
||||||
// Case 6: has partition candidates for 2 shards
|
// Case 6: has partition candidates for 2 shards
|
||||||
|
@ -1078,24 +1130,26 @@ mod tests {
|
||||||
txn.commit().await.unwrap();
|
txn.commit().await.unwrap();
|
||||||
|
|
||||||
// Will have 2 candidates, one for each shard
|
// Will have 2 candidates, one for each shard
|
||||||
let mut candidates = compactor.cold_partitions_to_compact(1).await.unwrap();
|
let (mut candidates, _table_columns) =
|
||||||
candidates.sort();
|
compactor.cold_partitions_to_compact(1).await.unwrap();
|
||||||
|
candidates.sort_by_key(|c| c.candidate);
|
||||||
assert_eq!(candidates.len(), 2);
|
assert_eq!(candidates.len(), 2);
|
||||||
assert_eq!(candidates[0].partition_id, partition4.id);
|
assert_eq!(candidates[0].id(), partition4.id);
|
||||||
assert_eq!(candidates[0].shard_id, shard.id);
|
assert_eq!(candidates[0].shard_id(), shard.id);
|
||||||
assert_eq!(candidates[1].partition_id, another_partition.id);
|
assert_eq!(candidates[1].id(), another_partition.id);
|
||||||
assert_eq!(candidates[1].shard_id, another_shard.id);
|
assert_eq!(candidates[1].shard_id(), another_shard.id);
|
||||||
|
|
||||||
// Ask for 2 candidates per shard; get back 3: 2 from shard and 1 from
|
// Ask for 2 candidates per shard; get back 3: 2 from shard and 1 from
|
||||||
// another_shard
|
// another_shard
|
||||||
let mut candidates = compactor.cold_partitions_to_compact(2).await.unwrap();
|
let (mut candidates, _table_columns) =
|
||||||
candidates.sort();
|
compactor.cold_partitions_to_compact(2).await.unwrap();
|
||||||
|
candidates.sort_by_key(|c| c.candidate);
|
||||||
assert_eq!(candidates.len(), 3);
|
assert_eq!(candidates.len(), 3);
|
||||||
assert_eq!(candidates[0].partition_id, partition2.id);
|
assert_eq!(candidates[0].id(), partition2.id);
|
||||||
assert_eq!(candidates[0].shard_id, shard.id);
|
assert_eq!(candidates[0].shard_id(), shard.id);
|
||||||
assert_eq!(candidates[1].partition_id, partition4.id);
|
assert_eq!(candidates[1].id(), partition4.id);
|
||||||
assert_eq!(candidates[1].shard_id, shard.id);
|
assert_eq!(candidates[1].shard_id(), shard.id);
|
||||||
assert_eq!(candidates[2].partition_id, another_partition.id);
|
assert_eq!(candidates[2].id(), another_partition.id);
|
||||||
assert_eq!(candidates[2].shard_id, another_shard.id);
|
assert_eq!(candidates[2].shard_id(), another_shard.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,7 @@ pub async fn compact(compactor: Arc<Compactor>) -> usize {
|
||||||
debug!(compaction_type, "start collecting partitions to compact");
|
debug!(compaction_type, "start collecting partitions to compact");
|
||||||
let attributes = Attributes::from(&[("partition_type", compaction_type)]);
|
let attributes = Attributes::from(&[("partition_type", compaction_type)]);
|
||||||
let start_time = compactor.time_provider.now();
|
let start_time = compactor.time_provider.now();
|
||||||
let candidates = Backoff::new(&compactor.backoff_config)
|
let (candidates, table_columns) = Backoff::new(&compactor.backoff_config)
|
||||||
.retry_all_errors("hot_partitions_to_compact", || async {
|
.retry_all_errors("hot_partitions_to_compact", || async {
|
||||||
compactor
|
compactor
|
||||||
.hot_partitions_to_compact(
|
.hot_partitions_to_compact(
|
||||||
|
@ -37,46 +37,6 @@ pub async fn compact(compactor: Arc<Compactor>) -> usize {
|
||||||
duration.record(delta);
|
duration.record(delta);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get extra needed information for selected partitions
|
|
||||||
let start_time = compactor.time_provider.now();
|
|
||||||
|
|
||||||
// Column types and their counts of the tables of the partition candidates
|
|
||||||
debug!(
|
|
||||||
num_candidates=?candidates.len(),
|
|
||||||
compaction_type,
|
|
||||||
"start getting column types for the partition candidates"
|
|
||||||
);
|
|
||||||
let table_columns = Backoff::new(&compactor.backoff_config)
|
|
||||||
.retry_all_errors("table_columns", || async {
|
|
||||||
compactor.table_columns(&candidates).await
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.expect("retry forever");
|
|
||||||
|
|
||||||
// Add other compaction-needed info into selected partitions
|
|
||||||
debug!(
|
|
||||||
num_candidates=?candidates.len(),
|
|
||||||
compaction_type,
|
|
||||||
"start getting additional info for the partition candidates"
|
|
||||||
);
|
|
||||||
let candidates = Backoff::new(&compactor.backoff_config)
|
|
||||||
.retry_all_errors("add_info_to_partitions", || async {
|
|
||||||
compactor.add_info_to_partitions(&candidates).await
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.expect("retry forever");
|
|
||||||
|
|
||||||
if let Some(delta) = compactor
|
|
||||||
.time_provider
|
|
||||||
.now()
|
|
||||||
.checked_duration_since(start_time)
|
|
||||||
{
|
|
||||||
let duration = compactor
|
|
||||||
.partitions_extra_info_reading_duration
|
|
||||||
.recorder(attributes.clone());
|
|
||||||
duration.record(delta);
|
|
||||||
}
|
|
||||||
|
|
||||||
let n_candidates = candidates.len();
|
let n_candidates = candidates.len();
|
||||||
if n_candidates == 0 {
|
if n_candidates == 0 {
|
||||||
debug!(compaction_type, "no compaction candidates found");
|
debug!(compaction_type, "no compaction candidates found");
|
||||||
|
@ -91,7 +51,7 @@ pub async fn compact(compactor: Arc<Compactor>) -> usize {
|
||||||
Arc::clone(&compactor),
|
Arc::clone(&compactor),
|
||||||
compaction_type,
|
compaction_type,
|
||||||
compact_in_parallel,
|
compact_in_parallel,
|
||||||
candidates,
|
candidates.into(),
|
||||||
table_columns,
|
table_columns,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
@ -126,11 +86,7 @@ mod tests {
|
||||||
use iox_tests::util::{TestCatalog, TestParquetFileBuilder};
|
use iox_tests::util::{TestCatalog, TestParquetFileBuilder};
|
||||||
use iox_time::{SystemProvider, TimeProvider};
|
use iox_time::{SystemProvider, TimeProvider};
|
||||||
use parquet_file::storage::ParquetStorage;
|
use parquet_file::storage::ParquetStorage;
|
||||||
use std::{
|
use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||||
collections::{HashMap, VecDeque},
|
|
||||||
sync::Arc,
|
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_compact_hot_partition_candidates() {
|
async fn test_compact_hot_partition_candidates() {
|
||||||
|
@ -277,7 +233,7 @@ mod tests {
|
||||||
partition6.create_parquet_file_catalog_record(pf6_2).await;
|
partition6.create_parquet_file_catalog_record(pf6_2).await;
|
||||||
|
|
||||||
// partition candidates: partitions with L0 and overlapped L1
|
// partition candidates: partitions with L0 and overlapped L1
|
||||||
let candidates = compactor
|
let (mut candidates, table_columns) = compactor
|
||||||
.hot_partitions_to_compact(
|
.hot_partitions_to_compact(
|
||||||
compactor.config.max_number_partitions_per_shard,
|
compactor.config.max_number_partitions_per_shard,
|
||||||
compactor
|
compactor
|
||||||
|
@ -288,8 +244,6 @@ mod tests {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(candidates.len(), 6);
|
assert_eq!(candidates.len(), 6);
|
||||||
|
|
||||||
// column types of the partitions
|
|
||||||
let table_columns = compactor.table_columns(&candidates).await.unwrap();
|
|
||||||
assert_eq!(table_columns.len(), 1);
|
assert_eq!(table_columns.len(), 1);
|
||||||
let mut cols = table_columns.get(&table.table.id).unwrap().clone();
|
let mut cols = table_columns.get(&table.table.id).unwrap().clone();
|
||||||
assert_eq!(cols.len(), 5);
|
assert_eq!(cols.len(), 5);
|
||||||
|
@ -304,11 +258,7 @@ mod tests {
|
||||||
expected_cols.sort_by_key(|c| c.col_type);
|
expected_cols.sort_by_key(|c| c.col_type);
|
||||||
assert_eq!(cols, expected_cols);
|
assert_eq!(cols, expected_cols);
|
||||||
|
|
||||||
// Add other compaction-needed info into selected partitions
|
candidates.sort_by_key(|c| c.candidate.partition_id);
|
||||||
let candidates = compactor.add_info_to_partitions(&candidates).await.unwrap();
|
|
||||||
let mut sorted_candidates = candidates.into_iter().collect::<Vec<_>>();
|
|
||||||
sorted_candidates.sort_by_key(|c| c.candidate.partition_id);
|
|
||||||
let sorted_candidates = sorted_candidates.into_iter().collect::<VecDeque<_>>();
|
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut repos = compactor.catalog.repositories().await;
|
let mut repos = compactor.catalog.repositories().await;
|
||||||
|
@ -330,7 +280,7 @@ mod tests {
|
||||||
Arc::clone(&compactor),
|
Arc::clone(&compactor),
|
||||||
"hot",
|
"hot",
|
||||||
mock_compactor.compaction_function(),
|
mock_compactor.compaction_function(),
|
||||||
sorted_candidates,
|
candidates.into(),
|
||||||
table_columns,
|
table_columns,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
@ -571,7 +521,7 @@ mod tests {
|
||||||
|
|
||||||
// ------------------------------------------------
|
// ------------------------------------------------
|
||||||
// Compact
|
// Compact
|
||||||
let candidates = compactor
|
let (mut candidates, _table_columns) = compactor
|
||||||
.hot_partitions_to_compact(
|
.hot_partitions_to_compact(
|
||||||
compactor.config.max_number_partitions_per_shard,
|
compactor.config.max_number_partitions_per_shard,
|
||||||
compactor
|
compactor
|
||||||
|
@ -580,10 +530,9 @@ mod tests {
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let mut candidates = compactor.add_info_to_partitions(&candidates).await.unwrap();
|
|
||||||
|
|
||||||
assert_eq!(candidates.len(), 1);
|
assert_eq!(candidates.len(), 1);
|
||||||
let c = candidates.pop_front().unwrap();
|
let c = candidates.pop().unwrap();
|
||||||
|
|
||||||
let parquet_files_for_compaction =
|
let parquet_files_for_compaction =
|
||||||
parquet_file_lookup::ParquetFilesForCompaction::for_partition_with_size_overrides(
|
parquet_file_lookup::ParquetFilesForCompaction::for_partition_with_size_overrides(
|
||||||
|
|
|
@ -437,7 +437,7 @@ pub mod tests {
|
||||||
.with_creation_time(hot_time_one_hour_ago);
|
.with_creation_time(hot_time_one_hour_ago);
|
||||||
partition1.create_parquet_file_catalog_record(pf1_1).await;
|
partition1.create_parquet_file_catalog_record(pf1_1).await;
|
||||||
|
|
||||||
let candidates = compactor
|
let (mut candidates, _table_columns) = compactor
|
||||||
.hot_partitions_to_compact(
|
.hot_partitions_to_compact(
|
||||||
compactor.config.max_number_partitions_per_shard,
|
compactor.config.max_number_partitions_per_shard,
|
||||||
compactor
|
compactor
|
||||||
|
@ -448,17 +448,14 @@ pub mod tests {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(candidates.len(), 1);
|
assert_eq!(candidates.len(), 1);
|
||||||
|
|
||||||
let candidates = compactor.add_info_to_partitions(&candidates).await.unwrap();
|
candidates.sort_by_key(|c| c.candidate.partition_id);
|
||||||
let mut sorted_candidates = candidates.into_iter().collect::<Vec<_>>();
|
|
||||||
sorted_candidates.sort_by_key(|c| c.candidate.partition_id);
|
|
||||||
let sorted_candidates = sorted_candidates.into_iter().collect::<VecDeque<_>>();
|
|
||||||
let table_columns = HashMap::new();
|
let table_columns = HashMap::new();
|
||||||
|
|
||||||
compact_candidates_with_memory_budget(
|
compact_candidates_with_memory_budget(
|
||||||
Arc::clone(&compactor),
|
Arc::clone(&compactor),
|
||||||
"hot",
|
"hot",
|
||||||
mock_compactor.compaction_function(),
|
mock_compactor.compaction_function(),
|
||||||
sorted_candidates,
|
candidates.into(),
|
||||||
table_columns,
|
table_columns,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
|
@ -189,7 +189,7 @@ pub(crate) async fn compact_parquet_files(
|
||||||
.sort_key
|
.sort_key
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.expect("no partition sort key in catalog")
|
.expect("no partition sort key in catalog")
|
||||||
.filter_to(&merged_schema.primary_key());
|
.filter_to(&merged_schema.primary_key(), partition_id.get());
|
||||||
|
|
||||||
let (small_cutoff_bytes, large_cutoff_bytes) =
|
let (small_cutoff_bytes, large_cutoff_bytes) =
|
||||||
cutoff_bytes(max_desired_file_size_bytes, percentage_max_file_size);
|
cutoff_bytes(max_desired_file_size_bytes, percentage_max_file_size);
|
||||||
|
@ -355,7 +355,7 @@ pub(crate) async fn compact_final_no_splits(
|
||||||
.sort_key
|
.sort_key
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.expect("no partition sort key in catalog")
|
.expect("no partition sort key in catalog")
|
||||||
.filter_to(&merged_schema.primary_key());
|
.filter_to(&merged_schema.primary_key(), partition_id.get());
|
||||||
|
|
||||||
let ctx = exec.new_context(ExecutorType::Reorg);
|
let ctx = exec.new_context(ExecutorType::Reorg);
|
||||||
// Compact everything into one file
|
// Compact everything into one file
|
||||||
|
@ -538,7 +538,9 @@ fn to_queryable_parquet_chunk(
|
||||||
.select_by_names(&selection)
|
.select_by_names(&selection)
|
||||||
.expect("schema in-sync");
|
.expect("schema in-sync");
|
||||||
let pk = schema.primary_key();
|
let pk = schema.primary_key();
|
||||||
let sort_key = partition_sort_key.as_ref().map(|sk| sk.filter_to(&pk));
|
let sort_key = partition_sort_key
|
||||||
|
.as_ref()
|
||||||
|
.map(|sk| sk.filter_to(&pk, file.partition_id().get()));
|
||||||
let file = Arc::new(ParquetFile::from(file));
|
let file = Arc::new(ParquetFile::from(file));
|
||||||
|
|
||||||
let parquet_chunk = ParquetChunk::new(Arc::clone(&file), Arc::new(schema), store);
|
let parquet_chunk = ParquetChunk::new(Arc::clone(&file), Arc::new(schema), store);
|
||||||
|
|
|
@ -129,7 +129,9 @@ impl ParquetFileWithTombstone {
|
||||||
.select_by_names(&selection)
|
.select_by_names(&selection)
|
||||||
.expect("schema in-sync");
|
.expect("schema in-sync");
|
||||||
let pk = schema.primary_key();
|
let pk = schema.primary_key();
|
||||||
let sort_key = partition_sort_key.as_ref().map(|sk| sk.filter_to(&pk));
|
let sort_key = partition_sort_key
|
||||||
|
.as_ref()
|
||||||
|
.map(|sk| sk.filter_to(&pk, self.partition_id.get()));
|
||||||
|
|
||||||
let parquet_chunk = ParquetChunk::new(Arc::clone(&self.data), Arc::new(schema), store);
|
let parquet_chunk = ParquetChunk::new(Arc::clone(&self.data), Arc::new(schema), store);
|
||||||
|
|
||||||
|
|
|
@ -2,13 +2,13 @@
|
||||||
|
|
||||||
use crate::expression::{conditional_expression, Expr};
|
use crate::expression::{conditional_expression, Expr};
|
||||||
use crate::identifier::{identifier, Identifier};
|
use crate::identifier::{identifier, Identifier};
|
||||||
|
use crate::internal::{expect, map_fail, ParseResult};
|
||||||
use core::fmt;
|
use core::fmt;
|
||||||
use nom::branch::alt;
|
use nom::branch::alt;
|
||||||
use nom::bytes::complete::{tag, tag_no_case};
|
use nom::bytes::complete::{tag, tag_no_case};
|
||||||
use nom::character::complete::{digit1, line_ending, multispace0, multispace1};
|
use nom::character::complete::{char, digit1, multispace1};
|
||||||
use nom::combinator::{cut, eof, map, map_res, opt, value};
|
use nom::combinator::{map, opt, value};
|
||||||
use nom::sequence::{delimited, pair, preceded, terminated};
|
use nom::sequence::{pair, preceded, terminated};
|
||||||
use nom::IResult;
|
|
||||||
use std::fmt::Formatter;
|
use std::fmt::Formatter;
|
||||||
|
|
||||||
/// Represents a fully-qualified measurement name.
|
/// Represents a fully-qualified measurement name.
|
||||||
|
@ -48,7 +48,7 @@ impl fmt::Display for MeasurementNameExpression {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Match a 3-part measurement name expression.
|
/// Match a 3-part measurement name expression.
|
||||||
pub fn measurement_name_expression(i: &str) -> IResult<&str, MeasurementNameExpression> {
|
pub fn measurement_name_expression(i: &str) -> ParseResult<&str, MeasurementNameExpression> {
|
||||||
let (remaining_input, (opt_db_rp, name)) = pair(
|
let (remaining_input, (opt_db_rp, name)) = pair(
|
||||||
opt(alt((
|
opt(alt((
|
||||||
// database "." retention_policy "."
|
// database "." retention_policy "."
|
||||||
|
@ -84,30 +84,39 @@ pub fn measurement_name_expression(i: &str) -> IResult<&str, MeasurementNameExpr
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse an unsigned integer.
|
/// Parse an unsigned integer.
|
||||||
fn unsigned_number(i: &str) -> IResult<&str, u64> {
|
fn unsigned_number(i: &str) -> ParseResult<&str, u64> {
|
||||||
map_res(digit1, |s: &str| s.parse())(i)
|
map_fail("unable to parse unsigned integer", digit1, &str::parse)(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse a LIMIT <n> clause.
|
/// Parse a LIMIT <n> clause.
|
||||||
pub fn limit_clause(i: &str) -> IResult<&str, u64> {
|
pub fn limit_clause(i: &str) -> ParseResult<&str, u64> {
|
||||||
preceded(pair(tag_no_case("LIMIT"), multispace1), unsigned_number)(i)
|
preceded(
|
||||||
|
pair(tag_no_case("LIMIT"), multispace1),
|
||||||
|
expect(
|
||||||
|
"invalid LIMIT clause, expected unsigned integer",
|
||||||
|
unsigned_number,
|
||||||
|
),
|
||||||
|
)(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse an OFFSET <n> clause.
|
/// Parse an OFFSET <n> clause.
|
||||||
pub fn offset_clause(i: &str) -> IResult<&str, u64> {
|
pub fn offset_clause(i: &str) -> ParseResult<&str, u64> {
|
||||||
preceded(pair(tag_no_case("OFFSET"), multispace1), unsigned_number)(i)
|
preceded(
|
||||||
|
pair(tag_no_case("OFFSET"), multispace1),
|
||||||
|
expect(
|
||||||
|
"invalid OFFSET clause, expected unsigned integer",
|
||||||
|
unsigned_number,
|
||||||
|
),
|
||||||
|
)(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse a terminator that ends a SQL statement.
|
/// Parse a terminator that ends a SQL statement.
|
||||||
pub fn statement_terminator(i: &str) -> IResult<&str, ()> {
|
pub fn statement_terminator(i: &str) -> ParseResult<&str, ()> {
|
||||||
let (remaining_input, _) =
|
value((), char(';'))(i)
|
||||||
delimited(multispace0, alt((tag(";"), line_ending, eof)), multispace0)(i)?;
|
|
||||||
|
|
||||||
Ok((remaining_input, ()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse a `WHERE` clause.
|
/// Parse a `WHERE` clause.
|
||||||
pub(crate) fn where_clause(i: &str) -> IResult<&str, Expr> {
|
pub fn where_clause(i: &str) -> ParseResult<&str, Expr> {
|
||||||
preceded(
|
preceded(
|
||||||
pair(tag_no_case("WHERE"), multispace1),
|
pair(tag_no_case("WHERE"), multispace1),
|
||||||
conditional_expression,
|
conditional_expression,
|
||||||
|
@ -144,7 +153,7 @@ pub enum OrderByClause {
|
||||||
/// ```
|
/// ```
|
||||||
///
|
///
|
||||||
/// [EBNF]: https://www.w3.org/TR/2010/REC-xquery-20101214/#EBNFNotation
|
/// [EBNF]: https://www.w3.org/TR/2010/REC-xquery-20101214/#EBNFNotation
|
||||||
pub(crate) fn order_by_clause(i: &str) -> IResult<&str, OrderByClause> {
|
pub fn order_by_clause(i: &str) -> ParseResult<&str, OrderByClause> {
|
||||||
let order = || {
|
let order = || {
|
||||||
preceded(
|
preceded(
|
||||||
multispace1,
|
multispace1,
|
||||||
|
@ -161,23 +170,25 @@ pub(crate) fn order_by_clause(i: &str) -> IResult<&str, OrderByClause> {
|
||||||
tag_no_case("ORDER"),
|
tag_no_case("ORDER"),
|
||||||
preceded(multispace1, tag_no_case("BY")),
|
preceded(multispace1, tag_no_case("BY")),
|
||||||
),
|
),
|
||||||
// cut to force failure, as `ORDER BY` must be followed by one of the following
|
expect(
|
||||||
cut(alt((
|
"invalid ORDER BY, expected ASC, DESC or TIME",
|
||||||
// "ASC" | "DESC"
|
alt((
|
||||||
order(),
|
// "ASC" | "DESC"
|
||||||
// "TIME" ( "ASC" | "DESC" )?
|
order(),
|
||||||
map(
|
// "TIME" ( "ASC" | "DESC" )?
|
||||||
preceded(preceded(multispace1, tag_no_case("TIME")), opt(order())),
|
map(
|
||||||
Option::<_>::unwrap_or_default,
|
preceded(preceded(multispace1, tag_no_case("TIME")), opt(order())),
|
||||||
),
|
Option::<_>::unwrap_or_default,
|
||||||
))),
|
),
|
||||||
|
)),
|
||||||
|
),
|
||||||
)(i)
|
)(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::assert_failure;
|
use crate::assert_expect_error;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_measurement_name_expression() {
|
fn test_measurement_name_expression() {
|
||||||
|
@ -187,7 +198,7 @@ mod tests {
|
||||||
MeasurementNameExpression {
|
MeasurementNameExpression {
|
||||||
database: None,
|
database: None,
|
||||||
retention_policy: None,
|
retention_policy: None,
|
||||||
name: Identifier::Unquoted("diskio".into()),
|
name: "diskio".into(),
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -195,9 +206,9 @@ mod tests {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
got,
|
got,
|
||||||
MeasurementNameExpression {
|
MeasurementNameExpression {
|
||||||
database: Some(Identifier::Unquoted("telegraf".into())),
|
database: Some("telegraf".into()),
|
||||||
retention_policy: Some(Identifier::Unquoted("autogen".into())),
|
retention_policy: Some("autogen".into()),
|
||||||
name: Identifier::Unquoted("diskio".into()),
|
name: "diskio".into(),
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -205,9 +216,9 @@ mod tests {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
got,
|
got,
|
||||||
MeasurementNameExpression {
|
MeasurementNameExpression {
|
||||||
database: Some(Identifier::Unquoted("telegraf".into())),
|
database: Some("telegraf".into()),
|
||||||
retention_policy: None,
|
retention_policy: None,
|
||||||
name: Identifier::Unquoted("diskio".into()),
|
name: "diskio".into(),
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -226,10 +237,22 @@ mod tests {
|
||||||
assert_eq!(got, 123);
|
assert_eq!(got, 123);
|
||||||
|
|
||||||
// not digits
|
// not digits
|
||||||
limit_clause("LIMIT sdf").unwrap_err();
|
assert_expect_error!(
|
||||||
|
limit_clause("LIMIT from"),
|
||||||
|
"invalid LIMIT clause, expected unsigned integer"
|
||||||
|
);
|
||||||
|
|
||||||
|
// incomplete input
|
||||||
|
assert_expect_error!(
|
||||||
|
limit_clause("LIMIT "),
|
||||||
|
"invalid LIMIT clause, expected unsigned integer"
|
||||||
|
);
|
||||||
|
|
||||||
// overflow
|
// overflow
|
||||||
limit_clause("LIMIT 34593745733489743985734857394").unwrap_err();
|
assert_expect_error!(
|
||||||
|
limit_clause("LIMIT 34593745733489743985734857394"),
|
||||||
|
"unable to parse unsigned integer"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -246,10 +269,22 @@ mod tests {
|
||||||
assert_eq!(got, 123);
|
assert_eq!(got, 123);
|
||||||
|
|
||||||
// not digits
|
// not digits
|
||||||
offset_clause("OFFSET sdf").unwrap_err();
|
assert_expect_error!(
|
||||||
|
offset_clause("OFFSET from"),
|
||||||
|
"invalid OFFSET clause, expected unsigned integer"
|
||||||
|
);
|
||||||
|
|
||||||
|
// incomplete input
|
||||||
|
assert_expect_error!(
|
||||||
|
offset_clause("OFFSET "),
|
||||||
|
"invalid OFFSET clause, expected unsigned integer"
|
||||||
|
);
|
||||||
|
|
||||||
// overflow
|
// overflow
|
||||||
offset_clause("OFFSET 34593745733489743985734857394").unwrap_err();
|
assert_expect_error!(
|
||||||
|
offset_clause("OFFSET 34593745733489743985734857394"),
|
||||||
|
"unable to parse unsigned integer"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -280,7 +315,10 @@ mod tests {
|
||||||
// Fallible cases
|
// Fallible cases
|
||||||
|
|
||||||
// Must be "time" identifier
|
// Must be "time" identifier
|
||||||
assert_failure!(order_by_clause("ORDER by foo"));
|
assert_expect_error!(
|
||||||
|
order_by_clause("ORDER by foo"),
|
||||||
|
"invalid ORDER BY, expected ASC, DESC or TIME"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -296,4 +334,16 @@ mod tests {
|
||||||
where_clause("WHERE foo = LIMIT 10").unwrap_err();
|
where_clause("WHERE foo = LIMIT 10").unwrap_err();
|
||||||
where_clause("WHERE").unwrap_err();
|
where_clause("WHERE").unwrap_err();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_statement_terminator() {
|
||||||
|
let (i, _) = statement_terminator(";foo").unwrap();
|
||||||
|
assert_eq!(i, "foo");
|
||||||
|
|
||||||
|
let (i, _) = statement_terminator("; foo").unwrap();
|
||||||
|
assert_eq!(i, " foo");
|
||||||
|
|
||||||
|
// Fallible cases
|
||||||
|
statement_terminator("foo").unwrap_err();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
use crate::identifier::unquoted_identifier;
|
use crate::identifier::unquoted_identifier;
|
||||||
|
use crate::internal::ParseResult;
|
||||||
use crate::literal::literal_regex;
|
use crate::literal::literal_regex;
|
||||||
use crate::{
|
use crate::{
|
||||||
identifier::{identifier, Identifier},
|
identifier::{identifier, Identifier},
|
||||||
|
@ -13,7 +14,6 @@ use nom::character::complete::{char, multispace0};
|
||||||
use nom::combinator::{cut, map, value};
|
use nom::combinator::{cut, map, value};
|
||||||
use nom::multi::{many0, separated_list0};
|
use nom::multi::{many0, separated_list0};
|
||||||
use nom::sequence::{delimited, preceded, separated_pair, tuple};
|
use nom::sequence::{delimited, preceded, separated_pair, tuple};
|
||||||
use nom::IResult;
|
|
||||||
use std::fmt::{Display, Formatter, Write};
|
use std::fmt::{Display, Formatter, Write};
|
||||||
|
|
||||||
/// An InfluxQL expression of any type.
|
/// An InfluxQL expression of any type.
|
||||||
|
@ -173,7 +173,7 @@ impl Display for BinaryOperator {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse a unary expression.
|
/// Parse a unary expression.
|
||||||
fn unary(i: &str) -> IResult<&str, Expr> {
|
fn unary(i: &str) -> ParseResult<&str, Expr> {
|
||||||
let (i, op) = preceded(
|
let (i, op) = preceded(
|
||||||
multispace0,
|
multispace0,
|
||||||
alt((
|
alt((
|
||||||
|
@ -188,7 +188,7 @@ fn unary(i: &str) -> IResult<&str, Expr> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse a parenthesis expression.
|
/// Parse a parenthesis expression.
|
||||||
fn parens(i: &str) -> IResult<&str, Expr> {
|
fn parens(i: &str) -> ParseResult<&str, Expr> {
|
||||||
delimited(
|
delimited(
|
||||||
preceded(multispace0, char('(')),
|
preceded(multispace0, char('(')),
|
||||||
map(conditional_expression, |e| Expr::Nested(e.into())),
|
map(conditional_expression, |e| Expr::Nested(e.into())),
|
||||||
|
@ -197,10 +197,10 @@ fn parens(i: &str) -> IResult<&str, Expr> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse a function call expression
|
/// Parse a function call expression
|
||||||
fn call(i: &str) -> IResult<&str, Expr> {
|
fn call(i: &str) -> ParseResult<&str, Expr> {
|
||||||
map(
|
map(
|
||||||
separated_pair(
|
separated_pair(
|
||||||
unquoted_identifier,
|
map(unquoted_identifier, &str::to_string),
|
||||||
multispace0,
|
multispace0,
|
||||||
delimited(
|
delimited(
|
||||||
char('('),
|
char('('),
|
||||||
|
@ -224,7 +224,7 @@ fn call(i: &str) -> IResult<&str, Expr> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse an operand expression, such as a literal, identifier or bind parameter.
|
/// Parse an operand expression, such as a literal, identifier or bind parameter.
|
||||||
fn operand(i: &str) -> IResult<&str, Expr> {
|
fn operand(i: &str) -> ParseResult<&str, Expr> {
|
||||||
preceded(
|
preceded(
|
||||||
multispace0,
|
multispace0,
|
||||||
alt((
|
alt((
|
||||||
|
@ -238,14 +238,14 @@ fn operand(i: &str) -> IResult<&str, Expr> {
|
||||||
/// Parse precedence priority 1 operators.
|
/// Parse precedence priority 1 operators.
|
||||||
///
|
///
|
||||||
/// These are the highest precedence operators, and include parenthesis and the unary operators.
|
/// These are the highest precedence operators, and include parenthesis and the unary operators.
|
||||||
fn factor(i: &str) -> IResult<&str, Expr> {
|
fn factor(i: &str) -> ParseResult<&str, Expr> {
|
||||||
alt((unary, parens, operand))(i)
|
alt((unary, parens, operand))(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse arithmetic, precedence priority 2 operators.
|
/// Parse arithmetic, precedence priority 2 operators.
|
||||||
///
|
///
|
||||||
/// This includes the multiplication, division, bitwise and, and modulus operators.
|
/// This includes the multiplication, division, bitwise and, and modulus operators.
|
||||||
fn term(i: &str) -> IResult<&str, Expr> {
|
fn term(i: &str) -> ParseResult<&str, Expr> {
|
||||||
let (input, left) = factor(i)?;
|
let (input, left) = factor(i)?;
|
||||||
let (input, remaining) = many0(tuple((
|
let (input, remaining) = many0(tuple((
|
||||||
preceded(
|
preceded(
|
||||||
|
@ -265,7 +265,7 @@ fn term(i: &str) -> IResult<&str, Expr> {
|
||||||
/// Parse arithmetic, precedence priority 3 operators.
|
/// Parse arithmetic, precedence priority 3 operators.
|
||||||
///
|
///
|
||||||
/// This includes the addition, subtraction, bitwise or, and bitwise xor operators.
|
/// This includes the addition, subtraction, bitwise or, and bitwise xor operators.
|
||||||
fn arithmetic(i: &str) -> IResult<&str, Expr> {
|
fn arithmetic(i: &str) -> ParseResult<&str, Expr> {
|
||||||
let (input, left) = term(i)?;
|
let (input, left) = term(i)?;
|
||||||
let (input, remaining) = many0(tuple((
|
let (input, remaining) = many0(tuple((
|
||||||
preceded(
|
preceded(
|
||||||
|
@ -283,7 +283,7 @@ fn arithmetic(i: &str) -> IResult<&str, Expr> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse the conditional regular expression operators `=~` and `!~`.
|
/// Parse the conditional regular expression operators `=~` and `!~`.
|
||||||
fn conditional_regex(i: &str) -> IResult<&str, Expr> {
|
fn conditional_regex(i: &str) -> ParseResult<&str, Expr> {
|
||||||
let (input, f1) = arithmetic(i)?;
|
let (input, f1) = arithmetic(i)?;
|
||||||
let (input, exprs) = many0(tuple((
|
let (input, exprs) = many0(tuple((
|
||||||
preceded(
|
preceded(
|
||||||
|
@ -299,7 +299,7 @@ fn conditional_regex(i: &str) -> IResult<&str, Expr> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse conditional operators.
|
/// Parse conditional operators.
|
||||||
fn conditional(i: &str) -> IResult<&str, Expr> {
|
fn conditional(i: &str) -> ParseResult<&str, Expr> {
|
||||||
let (input, f1) = conditional_regex(i)?;
|
let (input, f1) = conditional_regex(i)?;
|
||||||
let (input, exprs) = many0(tuple((
|
let (input, exprs) = many0(tuple((
|
||||||
preceded(
|
preceded(
|
||||||
|
@ -320,7 +320,7 @@ fn conditional(i: &str) -> IResult<&str, Expr> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse conjunction operators, such as `AND`.
|
/// Parse conjunction operators, such as `AND`.
|
||||||
fn conjunction(i: &str) -> IResult<&str, Expr> {
|
fn conjunction(i: &str) -> ParseResult<&str, Expr> {
|
||||||
let (input, f1) = conditional(i)?;
|
let (input, f1) = conditional(i)?;
|
||||||
let (input, exprs) = many0(tuple((
|
let (input, exprs) = many0(tuple((
|
||||||
value(
|
value(
|
||||||
|
@ -333,7 +333,7 @@ fn conjunction(i: &str) -> IResult<&str, Expr> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse disjunction operator, such as `OR`.
|
/// Parse disjunction operator, such as `OR`.
|
||||||
fn disjunction(i: &str) -> IResult<&str, Expr> {
|
fn disjunction(i: &str) -> ParseResult<&str, Expr> {
|
||||||
let (input, f1) = conjunction(i)?;
|
let (input, f1) = conjunction(i)?;
|
||||||
let (input, exprs) = many0(tuple((
|
let (input, exprs) = many0(tuple((
|
||||||
value(BinaryOperator::Or, preceded(multispace0, tag_no_case("or"))),
|
value(BinaryOperator::Or, preceded(multispace0, tag_no_case("or"))),
|
||||||
|
@ -343,7 +343,7 @@ fn disjunction(i: &str) -> IResult<&str, Expr> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse an InfluxQL conditional expression.
|
/// Parse an InfluxQL conditional expression.
|
||||||
pub fn conditional_expression(i: &str) -> IResult<&str, Expr> {
|
pub fn conditional_expression(i: &str) -> ParseResult<&str, Expr> {
|
||||||
disjunction(i)
|
disjunction(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -364,7 +364,7 @@ mod test {
|
||||||
/// Constructs an [Expr::Identifier] expression.
|
/// Constructs an [Expr::Identifier] expression.
|
||||||
macro_rules! ident {
|
macro_rules! ident {
|
||||||
($EXPR: expr) => {
|
($EXPR: expr) => {
|
||||||
Expr::Identifier(crate::identifier::Identifier::Unquoted($EXPR.into()))
|
Expr::Identifier($EXPR.into())
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -378,7 +378,7 @@ mod test {
|
||||||
/// Constructs a [Expr::BindParameter] expression.
|
/// Constructs a [Expr::BindParameter] expression.
|
||||||
macro_rules! param {
|
macro_rules! param {
|
||||||
($EXPR: expr) => {
|
($EXPR: expr) => {
|
||||||
Expr::BindParameter(crate::parameter::BindParameter::Unquoted($EXPR.into()).into())
|
Expr::BindParameter(crate::parameter::BindParameter($EXPR.into()).into())
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -603,7 +603,7 @@ mod test {
|
||||||
// quoted identifier
|
// quoted identifier
|
||||||
let (_, e) = conditional_expression(r#""foo" + 'bar'"#).unwrap();
|
let (_, e) = conditional_expression(r#""foo" + 'bar'"#).unwrap();
|
||||||
let got = format!("{}", e);
|
let got = format!("{}", e);
|
||||||
assert_eq!(got, r#""foo" + 'bar'"#);
|
assert_eq!(got, r#"foo + 'bar'"#);
|
||||||
|
|
||||||
// Duration
|
// Duration
|
||||||
let (_, e) = conditional_expression("- 6h30m").unwrap();
|
let (_, e) = conditional_expression("- 6h30m").unwrap();
|
||||||
|
|
|
@ -13,66 +13,59 @@
|
||||||
|
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
use crate::internal::ParseResult;
|
||||||
use crate::keywords::sql_keyword;
|
use crate::keywords::sql_keyword;
|
||||||
use crate::string::double_quoted_string;
|
use crate::string::double_quoted_string;
|
||||||
use crate::write_escaped;
|
use crate::write_quoted_string;
|
||||||
use nom::branch::alt;
|
use nom::branch::alt;
|
||||||
use nom::bytes::complete::tag;
|
use nom::bytes::complete::tag;
|
||||||
use nom::character::complete::{alpha1, alphanumeric1};
|
use nom::character::complete::{alpha1, alphanumeric1};
|
||||||
use nom::combinator::{map, not, recognize};
|
use nom::combinator::{map, not, recognize};
|
||||||
use nom::multi::many0_count;
|
use nom::multi::many0_count;
|
||||||
use nom::sequence::{pair, preceded};
|
use nom::sequence::{pair, preceded};
|
||||||
use nom::IResult;
|
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::fmt::{Display, Formatter, Write};
|
use std::fmt::{Display, Formatter, Write};
|
||||||
|
|
||||||
/// Parse an unquoted InfluxQL identifier.
|
/// Parse an unquoted InfluxQL identifier.
|
||||||
pub(crate) fn unquoted_identifier(i: &str) -> IResult<&str, String> {
|
pub fn unquoted_identifier(i: &str) -> ParseResult<&str, &str> {
|
||||||
map(
|
preceded(
|
||||||
preceded(
|
not(sql_keyword),
|
||||||
not(sql_keyword),
|
recognize(pair(
|
||||||
recognize(pair(
|
alt((alpha1, tag("_"))),
|
||||||
alt((alpha1, tag("_"))),
|
many0_count(alt((alphanumeric1, tag("_")))),
|
||||||
many0_count(alt((alphanumeric1, tag("_")))),
|
)),
|
||||||
)),
|
|
||||||
),
|
|
||||||
str::to_string,
|
|
||||||
)(i)
|
)(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `Identifier` is a type that represents either a quoted ([`Identifier::Quoted`]) or unquoted ([`Identifier::Unquoted`])
|
/// A type that represents an InfluxQL identifier.
|
||||||
/// InfluxQL identifier.
|
|
||||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||||
pub enum Identifier {
|
pub struct Identifier(pub String);
|
||||||
/// Contains an unquoted identifier
|
|
||||||
Unquoted(String),
|
|
||||||
|
|
||||||
/// Contains an unescaped quoted identifier
|
impl From<String> for Identifier {
|
||||||
Quoted(String),
|
fn from(s: String) -> Self {
|
||||||
|
Self(s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&str> for Identifier {
|
||||||
|
fn from(s: &str) -> Self {
|
||||||
|
Self(s.to_string())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Display for Identifier {
|
impl Display for Identifier {
|
||||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||||
match self {
|
write_quoted_string!(f, '"', self.0.as_str(), unquoted_identifier, '\n' => "\\n", '\\' => "\\\\", '"' => "\\\"");
|
||||||
Self::Unquoted(s) => write!(f, "{}", s)?,
|
|
||||||
Self::Quoted(s) => {
|
|
||||||
f.write_char('"')?;
|
|
||||||
// escape characters per https://github.com/influxdata/influxql/blob/df51a45762be9c1b578f01718fa92d286a843fe9/scanner.go#L576-L583
|
|
||||||
write_escaped!(f, s, '\n' => "\\n", '\\' => "\\\\", '"' => "\\\"");
|
|
||||||
f.write_char('"')?;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parses an InfluxQL [Identifier].
|
/// Parses an InfluxQL [Identifier].
|
||||||
pub fn identifier(i: &str) -> IResult<&str, Identifier> {
|
pub fn identifier(i: &str) -> ParseResult<&str, Identifier> {
|
||||||
// See: https://github.com/influxdata/influxql/blob/df51a45762be9c1b578f01718fa92d286a843fe9/scanner.go#L358-L362
|
// See: https://github.com/influxdata/influxql/blob/df51a45762be9c1b578f01718fa92d286a843fe9/scanner.go#L358-L362
|
||||||
alt((
|
alt((
|
||||||
map(unquoted_identifier, Identifier::Unquoted),
|
map(unquoted_identifier, Into::into),
|
||||||
map(double_quoted_string, Identifier::Quoted),
|
map(double_quoted_string, Into::into),
|
||||||
))(i)
|
))(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,24 +102,21 @@ mod test {
|
||||||
fn test_identifier() {
|
fn test_identifier() {
|
||||||
// quoted
|
// quoted
|
||||||
let (_, got) = identifier("\"quick draw\"").unwrap();
|
let (_, got) = identifier("\"quick draw\"").unwrap();
|
||||||
assert!(matches!(got, Identifier::Quoted(s) if s == "quick draw"));
|
assert_eq!(got, "quick draw".into());
|
||||||
|
|
||||||
// unquoted
|
// unquoted
|
||||||
let (_, got) = identifier("quick_draw").unwrap();
|
let (_, got) = identifier("quick_draw").unwrap();
|
||||||
assert!(matches!(got, Identifier::Unquoted(s) if s == "quick_draw"));
|
assert_eq!(got, "quick_draw".into());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_identifier_display() {
|
fn test_identifier_display() {
|
||||||
// test quoted identifier properly escapes specific characters
|
// Identifier properly escapes specific characters and quotes output
|
||||||
let got = format!(
|
let got = format!("{}", Identifier("quick\n\t\\\"'draw \u{1f47d}".into()));
|
||||||
"{}",
|
|
||||||
Identifier::Quoted("quick\n\t\\\"'draw \u{1f47d}".to_string())
|
|
||||||
);
|
|
||||||
assert_eq!(got, r#""quick\n \\\"'draw 👽""#);
|
assert_eq!(got, r#""quick\n \\\"'draw 👽""#);
|
||||||
|
|
||||||
// test unquoted identifier
|
// Identifier displays unquoted output
|
||||||
let got = format!("{}", Identifier::Unquoted("quick_draw".to_string()));
|
let got = format!("{}", Identifier("quick_draw".into()));
|
||||||
assert_eq!(got, "quick_draw");
|
assert_eq!(got, "quick_draw");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,89 @@
|
||||||
|
//! Internal result and error types used to build InfluxQL parsers
|
||||||
|
//!
|
||||||
|
use nom::error::{ErrorKind as NomErrorKind, ParseError as NomParseError};
|
||||||
|
use nom::Parser;
|
||||||
|
use std::fmt::{Display, Formatter};
|
||||||
|
|
||||||
|
/// This trait must be implemented in order to use the [`map_fail`] and
|
||||||
|
/// [`expect`] functions for generating user-friendly error messages.
|
||||||
|
pub trait ParseError<'a>: NomParseError<&'a str> + Sized {
|
||||||
|
fn from_message(input: &'a str, message: &'static str) -> Self;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An internal error type used to build InfluxQL parsers.
|
||||||
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
|
pub enum Error<I> {
|
||||||
|
Syntax { input: I, message: &'static str },
|
||||||
|
Nom(I, NomErrorKind),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I: Display> Display for Error<I> {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::Syntax { input: _, message } => {
|
||||||
|
write!(f, "Syntax error: {}", message)?;
|
||||||
|
}
|
||||||
|
Self::Nom(_, kind) => write!(f, "nom error: {:?}", kind)?,
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> ParseError<'a> for Error<&'a str> {
|
||||||
|
fn from_message(input: &'a str, message: &'static str) -> Self {
|
||||||
|
Self::Syntax { input, message }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Applies a function returning a [`ParseResult`] over the result of the `parser`.
|
||||||
|
/// If the parser returns an error, the result will be mapped to a [`nom::Err::Failure`]
|
||||||
|
/// with the specified `message` for additional context.
|
||||||
|
pub fn map_fail<'a, O1, O2, E: ParseError<'a>, E2, F, G>(
|
||||||
|
message: &'static str,
|
||||||
|
mut parser: F,
|
||||||
|
mut f: G,
|
||||||
|
) -> impl FnMut(&'a str) -> ParseResult<&'a str, O2, E>
|
||||||
|
where
|
||||||
|
F: Parser<&'a str, O1, E>,
|
||||||
|
G: FnMut(O1) -> Result<O2, E2>,
|
||||||
|
{
|
||||||
|
move |input| {
|
||||||
|
let (input, o1) = parser.parse(input)?;
|
||||||
|
match f(o1) {
|
||||||
|
Ok(o2) => Ok((input, o2)),
|
||||||
|
Err(_) => Err(nom::Err::Failure(E::from_message(input, message))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Transforms a [`nom::Err::Error`] to a [`nom::Err::Failure`] using `message` for additional
|
||||||
|
/// context.
|
||||||
|
pub fn expect<'a, E: ParseError<'a>, F, O>(
|
||||||
|
message: &'static str,
|
||||||
|
mut f: F,
|
||||||
|
) -> impl FnMut(&'a str) -> ParseResult<&'a str, O, E>
|
||||||
|
where
|
||||||
|
F: Parser<&'a str, O, E>,
|
||||||
|
{
|
||||||
|
move |i| match f.parse(i) {
|
||||||
|
Ok(o) => Ok(o),
|
||||||
|
Err(nom::Err::Incomplete(i)) => Err(nom::Err::Incomplete(i)),
|
||||||
|
Err(nom::Err::Error(_)) => Err(nom::Err::Failure(E::from_message(i, message))),
|
||||||
|
Err(nom::Err::Failure(e)) => Err(nom::Err::Failure(e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I> NomParseError<I> for Error<I> {
|
||||||
|
fn from_error_kind(input: I, kind: NomErrorKind) -> Self {
|
||||||
|
Self::Nom(input, kind)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn append(_: I, _: NomErrorKind, other: Self) -> Self {
|
||||||
|
other
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// ParseResult is a type alias for [`nom::IResult`] used by nom combinator
|
||||||
|
/// functions for parsing InfluxQL.
|
||||||
|
pub type ParseResult<I, T, E = Error<I>> = nom::IResult<I, T, E>;
|
|
@ -4,14 +4,14 @@
|
||||||
|
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
use crate::internal::ParseResult;
|
||||||
use nom::branch::alt;
|
use nom::branch::alt;
|
||||||
use nom::bytes::complete::{tag, tag_no_case};
|
use nom::bytes::complete::{tag, tag_no_case};
|
||||||
use nom::combinator::{eof, peek};
|
use nom::combinator::{eof, peek};
|
||||||
use nom::sequence::terminated;
|
use nom::sequence::terminated;
|
||||||
use nom::IResult;
|
|
||||||
|
|
||||||
/// Peeks at the input for acceptable characters following a keyword.
|
/// Peeks at the input for acceptable characters following a keyword.
|
||||||
fn keyword_follow_char(i: &str) -> IResult<&str, &str> {
|
fn keyword_follow_char(i: &str) -> ParseResult<&str, &str> {
|
||||||
peek(alt((
|
peek(alt((
|
||||||
tag(" "),
|
tag(" "),
|
||||||
tag("\n"),
|
tag("\n"),
|
||||||
|
@ -26,7 +26,7 @@ fn keyword_follow_char(i: &str) -> IResult<&str, &str> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parses the input for matching InfluxQL keywords from ALL to DROP.
|
/// Parses the input for matching InfluxQL keywords from ALL to DROP.
|
||||||
fn keyword_all_to_drop(i: &str) -> IResult<&str, &str> {
|
fn keyword_all_to_drop(i: &str) -> ParseResult<&str, &str> {
|
||||||
alt((
|
alt((
|
||||||
terminated(tag_no_case("ALL"), keyword_follow_char),
|
terminated(tag_no_case("ALL"), keyword_follow_char),
|
||||||
terminated(tag_no_case("ALTER"), keyword_follow_char),
|
terminated(tag_no_case("ALTER"), keyword_follow_char),
|
||||||
|
@ -53,7 +53,7 @@ fn keyword_all_to_drop(i: &str) -> IResult<&str, &str> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parses the input for matching InfluxQL keywords from DURATION to LIMIT.
|
/// Parses the input for matching InfluxQL keywords from DURATION to LIMIT.
|
||||||
fn keyword_duration_to_limit(i: &str) -> IResult<&str, &str> {
|
fn keyword_duration_to_limit(i: &str) -> ParseResult<&str, &str> {
|
||||||
alt((
|
alt((
|
||||||
terminated(tag_no_case("DURATION"), keyword_follow_char),
|
terminated(tag_no_case("DURATION"), keyword_follow_char),
|
||||||
terminated(tag_no_case("END"), keyword_follow_char),
|
terminated(tag_no_case("END"), keyword_follow_char),
|
||||||
|
@ -79,7 +79,7 @@ fn keyword_duration_to_limit(i: &str) -> IResult<&str, &str> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parses the input for matching InfluxQL keywords from MEASUREMENT to SET.
|
/// Parses the input for matching InfluxQL keywords from MEASUREMENT to SET.
|
||||||
fn keyword_measurement_to_set(i: &str) -> IResult<&str, &str> {
|
fn keyword_measurement_to_set(i: &str) -> ParseResult<&str, &str> {
|
||||||
alt((
|
alt((
|
||||||
terminated(tag_no_case("MEASUREMENT"), keyword_follow_char),
|
terminated(tag_no_case("MEASUREMENT"), keyword_follow_char),
|
||||||
terminated(tag_no_case("MEASUREMENTS"), keyword_follow_char),
|
terminated(tag_no_case("MEASUREMENTS"), keyword_follow_char),
|
||||||
|
@ -106,7 +106,7 @@ fn keyword_measurement_to_set(i: &str) -> IResult<&str, &str> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parses the input for matching InfluxQL keywords from SHOW to WRITE.
|
/// Parses the input for matching InfluxQL keywords from SHOW to WRITE.
|
||||||
fn keyword_show_to_write(i: &str) -> IResult<&str, &str> {
|
fn keyword_show_to_write(i: &str) -> ParseResult<&str, &str> {
|
||||||
alt((
|
alt((
|
||||||
terminated(tag_no_case("SHOW"), keyword_follow_char),
|
terminated(tag_no_case("SHOW"), keyword_follow_char),
|
||||||
terminated(tag_no_case("SHARD"), keyword_follow_char),
|
terminated(tag_no_case("SHARD"), keyword_follow_char),
|
||||||
|
@ -127,8 +127,8 @@ fn keyword_show_to_write(i: &str) -> IResult<&str, &str> {
|
||||||
))(i)
|
))(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Matches any InfluxQL reserved keyword.
|
/// Matches any InfluxQL reserved keyword.
|
||||||
pub fn sql_keyword(i: &str) -> IResult<&str, &str> {
|
pub fn sql_keyword(i: &str) -> ParseResult<&str, &str> {
|
||||||
// NOTE that the alt function takes a tuple with a maximum arity of 21, hence
|
// NOTE that the alt function takes a tuple with a maximum arity of 21, hence
|
||||||
// the reason these are broken into groups
|
// the reason these are broken into groups
|
||||||
alt((
|
alt((
|
||||||
|
|
|
@ -11,14 +11,137 @@
|
||||||
clippy::use_self,
|
clippy::use_self,
|
||||||
clippy::clone_on_ref_ptr
|
clippy::clone_on_ref_ptr
|
||||||
)]
|
)]
|
||||||
|
|
||||||
|
use crate::common::statement_terminator;
|
||||||
|
use crate::internal::Error as InternalError;
|
||||||
|
use crate::statement::statement;
|
||||||
|
pub use crate::statement::Statement;
|
||||||
|
use nom::character::complete::multispace0;
|
||||||
|
use nom::combinator::eof;
|
||||||
|
use nom::Offset;
|
||||||
|
use std::fmt::{Debug, Display, Formatter};
|
||||||
|
|
||||||
mod common;
|
mod common;
|
||||||
mod expression;
|
mod expression;
|
||||||
mod identifier;
|
mod identifier;
|
||||||
|
mod internal;
|
||||||
mod keywords;
|
mod keywords;
|
||||||
mod literal;
|
mod literal;
|
||||||
mod parameter;
|
mod parameter;
|
||||||
|
mod show;
|
||||||
mod show_measurements;
|
mod show_measurements;
|
||||||
|
mod statement;
|
||||||
mod string;
|
mod string;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test_util;
|
mod test_util;
|
||||||
|
|
||||||
|
/// A error returned when parsing an InfluxQL query using
|
||||||
|
/// [`parse_statements`] fails.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub struct ParseError {
|
||||||
|
message: String,
|
||||||
|
pos: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for ParseError {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "{} at pos {}", self.message, self.pos)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// ParseResult is type that represents the success or failure of parsing
|
||||||
|
/// a given input into a set of InfluxQL statements.
|
||||||
|
///
|
||||||
|
/// Errors are human-readable messages indicating the cause of the parse failure.
|
||||||
|
pub type ParseResult = Result<Vec<Statement>, ParseError>;
|
||||||
|
|
||||||
|
/// Parse the input into a set of InfluxQL statements.
|
||||||
|
pub fn parse_statements(input: &str) -> ParseResult {
|
||||||
|
let mut res = Vec::new();
|
||||||
|
let mut i: &str = input;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// Consume whitespace from the input
|
||||||
|
i = match multispace0::<_, nom::error::Error<_>>(i) {
|
||||||
|
Ok((i1, _)) => i1,
|
||||||
|
_ => unreachable!("multispace0 is infallible"),
|
||||||
|
};
|
||||||
|
|
||||||
|
if eof::<_, nom::error::Error<_>>(i).is_ok() {
|
||||||
|
return Ok(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Ok((i1, _)) = statement_terminator(i) {
|
||||||
|
i = i1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
match statement(i) {
|
||||||
|
Ok((i1, o)) => {
|
||||||
|
res.push(o);
|
||||||
|
i = i1;
|
||||||
|
}
|
||||||
|
Err(nom::Err::Failure(InternalError::Syntax {
|
||||||
|
input: pos,
|
||||||
|
message,
|
||||||
|
})) => {
|
||||||
|
return Err(ParseError {
|
||||||
|
message: message.into(),
|
||||||
|
pos: input.offset(pos),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
// any other error indicates an invalid statement
|
||||||
|
Err(_) => {
|
||||||
|
return Err(ParseError {
|
||||||
|
message: "invalid SQL statement".into(),
|
||||||
|
pos: input.offset(i),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use crate::parse_statements;
|
||||||
|
|
||||||
|
/// Validates that the [`parse_statements`] function
|
||||||
|
/// handles statement terminators and errors.
|
||||||
|
#[test]
|
||||||
|
fn test_parse_statements() {
|
||||||
|
// Parse a single statement, without a terminator
|
||||||
|
let got = parse_statements("SHOW MEASUREMENTS").unwrap();
|
||||||
|
assert_eq!(format!("{}", got.first().unwrap()), "SHOW MEASUREMENTS");
|
||||||
|
|
||||||
|
// Parse a single statement, with a terminator
|
||||||
|
let got = parse_statements("SHOW MEASUREMENTS;").unwrap();
|
||||||
|
assert_eq!(format!("{}", got[0]), "SHOW MEASUREMENTS");
|
||||||
|
|
||||||
|
// Parse multiple statements with whitespace
|
||||||
|
let got = parse_statements("SHOW MEASUREMENTS;\nSHOW MEASUREMENTS LIMIT 1").unwrap();
|
||||||
|
assert_eq!(format!("{}", got[0]), "SHOW MEASUREMENTS");
|
||||||
|
assert_eq!(format!("{}", got[1]), "SHOW MEASUREMENTS LIMIT 1");
|
||||||
|
|
||||||
|
// Parse multiple statements with a terminator in quotes, ensuring it is not interpreted as
|
||||||
|
// a terminator
|
||||||
|
let got = parse_statements(
|
||||||
|
"SHOW MEASUREMENTS WITH MEASUREMENT = \";\";SHOW MEASUREMENTS LIMIT 1",
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
format!("{}", got[0]),
|
||||||
|
"SHOW MEASUREMENTS WITH MEASUREMENT = \";\""
|
||||||
|
);
|
||||||
|
assert_eq!(format!("{}", got[1]), "SHOW MEASUREMENTS LIMIT 1");
|
||||||
|
|
||||||
|
// Returns error for invalid statement
|
||||||
|
let got = parse_statements("BAD SQL").unwrap_err();
|
||||||
|
assert_eq!(format!("{}", got), "invalid SQL statement at pos 0");
|
||||||
|
|
||||||
|
// Returns error for invalid statement after first
|
||||||
|
let got = parse_statements("SHOW MEASUREMENTS;BAD SQL").unwrap_err();
|
||||||
|
assert_eq!(format!("{}", got), "invalid SQL statement at pos 18");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,14 +1,14 @@
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
use crate::internal::{map_fail, ParseResult};
|
||||||
use crate::string::{regex, single_quoted_string, Regex};
|
use crate::string::{regex, single_quoted_string, Regex};
|
||||||
use crate::write_escaped;
|
use crate::write_escaped;
|
||||||
use nom::branch::alt;
|
use nom::branch::alt;
|
||||||
use nom::bytes::complete::{tag, tag_no_case};
|
use nom::bytes::complete::{tag, tag_no_case};
|
||||||
use nom::character::complete::digit1;
|
use nom::character::complete::digit1;
|
||||||
use nom::combinator::{map, map_res, recognize, value};
|
use nom::combinator::{map, recognize, value};
|
||||||
use nom::multi::fold_many1;
|
use nom::multi::fold_many1;
|
||||||
use nom::sequence::{pair, separated_pair};
|
use nom::sequence::{pair, separated_pair};
|
||||||
use nom::IResult;
|
|
||||||
use std::fmt::{Display, Formatter, Write};
|
use std::fmt::{Display, Formatter, Write};
|
||||||
|
|
||||||
/// Number of nanoseconds in a microsecond.
|
/// Number of nanoseconds in a microsecond.
|
||||||
|
@ -110,8 +110,8 @@ impl Display for Literal {
|
||||||
/// ```text
|
/// ```text
|
||||||
/// INTEGER ::= [0-9]+
|
/// INTEGER ::= [0-9]+
|
||||||
/// ```
|
/// ```
|
||||||
fn integer(i: &str) -> IResult<&str, i64> {
|
fn integer(i: &str) -> ParseResult<&str, i64> {
|
||||||
map_res(digit1, |s: &str| s.parse())(i)
|
map_fail("unable to parse integer", digit1, &str::parse)(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse an unsigned InfluxQL integer.
|
/// Parse an unsigned InfluxQL integer.
|
||||||
|
@ -121,8 +121,8 @@ fn integer(i: &str) -> IResult<&str, i64> {
|
||||||
/// ```text
|
/// ```text
|
||||||
/// INTEGER ::= [0-9]+
|
/// INTEGER ::= [0-9]+
|
||||||
/// ```
|
/// ```
|
||||||
fn unsigned_integer(i: &str) -> IResult<&str, u64> {
|
fn unsigned_integer(i: &str) -> ParseResult<&str, u64> {
|
||||||
map_res(digit1, |s: &str| s.parse())(i)
|
map_fail("unable to parse unsigned integer", digit1, &str::parse)(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse an unsigned InfluxQL floating point number.
|
/// Parse an unsigned InfluxQL floating point number.
|
||||||
|
@ -133,15 +133,16 @@ fn unsigned_integer(i: &str) -> IResult<&str, u64> {
|
||||||
/// float ::= INTEGER "." INTEGER
|
/// float ::= INTEGER "." INTEGER
|
||||||
/// INTEGER ::= [0-9]+
|
/// INTEGER ::= [0-9]+
|
||||||
/// ```
|
/// ```
|
||||||
fn float(i: &str) -> IResult<&str, f64> {
|
fn float(i: &str) -> ParseResult<&str, f64> {
|
||||||
map_res(
|
map_fail(
|
||||||
|
"unable to parse float",
|
||||||
recognize(separated_pair(digit1, tag("."), digit1)),
|
recognize(separated_pair(digit1, tag("."), digit1)),
|
||||||
|s: &str| s.parse(),
|
&str::parse,
|
||||||
)(i)
|
)(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse the input for an InfluxQL boolean, which must be the value `true` or `false`.
|
/// Parse the input for an InfluxQL boolean, which must be the value `true` or `false`.
|
||||||
fn boolean(i: &str) -> IResult<&str, bool> {
|
fn boolean(i: &str) -> ParseResult<&str, bool> {
|
||||||
alt((
|
alt((
|
||||||
value(true, tag_no_case("true")),
|
value(true, tag_no_case("true")),
|
||||||
value(false, tag_no_case("false")),
|
value(false, tag_no_case("false")),
|
||||||
|
@ -202,7 +203,7 @@ impl Display for Duration {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse the input for a InfluxQL duration fragment and returns the value in nanoseconds.
|
/// Parse the input for a InfluxQL duration fragment and returns the value in nanoseconds.
|
||||||
fn single_duration(i: &str) -> IResult<&str, i64> {
|
fn single_duration(i: &str) -> ParseResult<&str, i64> {
|
||||||
use DurationUnit::*;
|
use DurationUnit::*;
|
||||||
|
|
||||||
map(
|
map(
|
||||||
|
@ -234,7 +235,7 @@ fn single_duration(i: &str) -> IResult<&str, i64> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse the input for an InfluxQL duration and returns the value in nanoseconds.
|
/// Parse the input for an InfluxQL duration and returns the value in nanoseconds.
|
||||||
fn duration(i: &str) -> IResult<&str, Duration> {
|
fn duration(i: &str) -> ParseResult<&str, Duration> {
|
||||||
map(
|
map(
|
||||||
fold_many1(single_duration, || 0, |acc, fragment| acc + fragment),
|
fold_many1(single_duration, || 0, |acc, fragment| acc + fragment),
|
||||||
Duration,
|
Duration,
|
||||||
|
@ -244,7 +245,7 @@ fn duration(i: &str) -> IResult<&str, Duration> {
|
||||||
/// Parse an InfluxQL literal, except a [`Regex`].
|
/// Parse an InfluxQL literal, except a [`Regex`].
|
||||||
///
|
///
|
||||||
/// See [`literal_regex`] for parsing literal regular expressions.
|
/// See [`literal_regex`] for parsing literal regular expressions.
|
||||||
pub fn literal(i: &str) -> IResult<&str, Literal> {
|
pub fn literal(i: &str) -> ParseResult<&str, Literal> {
|
||||||
alt((
|
alt((
|
||||||
// NOTE: order is important, as floats should be tested before durations and integers.
|
// NOTE: order is important, as floats should be tested before durations and integers.
|
||||||
map(float, Literal::Float),
|
map(float, Literal::Float),
|
||||||
|
@ -256,7 +257,7 @@ pub fn literal(i: &str) -> IResult<&str, Literal> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse an InfluxQL literal regular expression.
|
/// Parse an InfluxQL literal regular expression.
|
||||||
pub fn literal_regex(i: &str) -> IResult<&str, Literal> {
|
pub fn literal_regex(i: &str) -> ParseResult<&str, Literal> {
|
||||||
map(regex, Literal::Regex)(i)
|
map(regex, Literal::Regex)(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,61 +9,55 @@
|
||||||
|
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
use crate::internal::ParseResult;
|
||||||
use crate::string::double_quoted_string;
|
use crate::string::double_quoted_string;
|
||||||
use crate::write_escaped;
|
use crate::write_quoted_string;
|
||||||
use nom::branch::alt;
|
use nom::branch::alt;
|
||||||
use nom::bytes::complete::tag;
|
use nom::bytes::complete::tag;
|
||||||
use nom::character::complete::{alphanumeric1, char};
|
use nom::character::complete::{alphanumeric1, char};
|
||||||
use nom::combinator::{map, recognize};
|
use nom::combinator::{map, recognize};
|
||||||
use nom::multi::many1_count;
|
use nom::multi::many1_count;
|
||||||
use nom::sequence::preceded;
|
use nom::sequence::preceded;
|
||||||
use nom::IResult;
|
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::fmt::{Display, Formatter, Write};
|
use std::fmt::{Display, Formatter, Write};
|
||||||
|
|
||||||
/// Parse an unquoted InfluxQL bind parameter.
|
/// Parse an unquoted InfluxQL bind parameter.
|
||||||
fn unquoted_parameter(i: &str) -> IResult<&str, String> {
|
fn unquoted_parameter(i: &str) -> ParseResult<&str, &str> {
|
||||||
map(
|
recognize(many1_count(alt((alphanumeric1, tag("_")))))(i)
|
||||||
recognize(many1_count(alt((alphanumeric1, tag("_"))))),
|
|
||||||
str::to_string,
|
|
||||||
)(i)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `BindParameter` is a type that represents either a quoted ([`BindParameter::Quoted`]) or unquoted ([`BindParameter::Unquoted`])
|
/// A type that represents an InfluxQL bind parameter.
|
||||||
/// InfluxQL bind parameter.
|
|
||||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||||
pub enum BindParameter {
|
pub struct BindParameter(pub String);
|
||||||
/// Contains an unquoted bind parameter
|
|
||||||
Unquoted(String),
|
|
||||||
|
|
||||||
/// Contains an unescaped quoted identifier
|
impl From<String> for BindParameter {
|
||||||
Quoted(String),
|
fn from(s: String) -> Self {
|
||||||
|
Self(s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&str> for BindParameter {
|
||||||
|
fn from(s: &str) -> Self {
|
||||||
|
Self(s.to_string())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Display for BindParameter {
|
impl Display for BindParameter {
|
||||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||||
match self {
|
f.write_char('$')?;
|
||||||
Self::Unquoted(s) => write!(f, "${}", s)?,
|
write_quoted_string!(f, '"', self.0.as_str(), unquoted_parameter, '\n' => "\\n", '\\' => "\\\\", '"' => "\\\"");
|
||||||
Self::Quoted(s) => {
|
|
||||||
f.write_str("$\"")?;
|
|
||||||
// escape characters per https://github.com/influxdata/influxql/blob/df51a45762be9c1b578f01718fa92d286a843fe9/scanner.go#L576-L583
|
|
||||||
write_escaped!(f, s, '\n' => "\\n", '\\' => "\\\\", '"' => "\\\"");
|
|
||||||
f.write_char('"')?;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parses an InfluxQL [BindParameter].
|
/// Parses an InfluxQL [BindParameter].
|
||||||
pub fn parameter(i: &str) -> IResult<&str, BindParameter> {
|
pub fn parameter(i: &str) -> ParseResult<&str, BindParameter> {
|
||||||
// See: https://github.com/influxdata/influxql/blob/df51a45762be9c1b578f01718fa92d286a843fe9/scanner.go#L358-L362
|
// See: https://github.com/influxdata/influxql/blob/df51a45762be9c1b578f01718fa92d286a843fe9/scanner.go#L358-L362
|
||||||
preceded(
|
preceded(
|
||||||
char('$'),
|
char('$'),
|
||||||
alt((
|
alt((
|
||||||
map(unquoted_parameter, BindParameter::Unquoted),
|
map(unquoted_parameter, Into::into),
|
||||||
map(double_quoted_string, BindParameter::Quoted),
|
map(double_quoted_string, Into::into),
|
||||||
)),
|
)),
|
||||||
)(i)
|
)(i)
|
||||||
}
|
}
|
||||||
|
@ -76,41 +70,44 @@ mod test {
|
||||||
fn test_parameter() {
|
fn test_parameter() {
|
||||||
// all ascii
|
// all ascii
|
||||||
let (_, got) = parameter("$cpu").unwrap();
|
let (_, got) = parameter("$cpu").unwrap();
|
||||||
assert_eq!(got, BindParameter::Unquoted("cpu".into()));
|
assert_eq!(got, "cpu".into());
|
||||||
|
|
||||||
// digits
|
// digits
|
||||||
let (_, got) = parameter("$01").unwrap();
|
let (_, got) = parameter("$01").unwrap();
|
||||||
assert_eq!(got, BindParameter::Unquoted("01".into()));
|
assert_eq!(got, "01".into());
|
||||||
|
|
||||||
// all valid chars
|
// all valid chars
|
||||||
let (_, got) = parameter("$cpu_0").unwrap();
|
let (_, got) = parameter("$cpu_0").unwrap();
|
||||||
assert_eq!(got, BindParameter::Unquoted("cpu_0".into()));
|
assert_eq!(got, "cpu_0".into());
|
||||||
|
|
||||||
// keyword
|
// keyword
|
||||||
let (_, got) = parameter("$from").unwrap();
|
let (_, got) = parameter("$from").unwrap();
|
||||||
assert_eq!(got, BindParameter::Unquoted("from".into()));
|
assert_eq!(got, "from".into());
|
||||||
|
|
||||||
// quoted
|
// quoted
|
||||||
let (_, got) = parameter("$\"quick draw\"").unwrap();
|
let (_, got) = parameter("$\"quick draw\"").unwrap();
|
||||||
assert!(matches!(got, BindParameter::Quoted(s) if s == "quick draw"));
|
assert_eq!(got, "quick draw".into());
|
||||||
|
|
||||||
// ┌─────────────────────────────┐
|
// ┌─────────────────────────────┐
|
||||||
// │ Fallible tests │
|
// │ Fallible tests │
|
||||||
// └─────────────────────────────┘
|
// └─────────────────────────────┘
|
||||||
|
|
||||||
// missing `$` prefix
|
// missing `$` prefix
|
||||||
let res = parameter("cpu");
|
parameter("cpu").unwrap_err();
|
||||||
assert!(res.is_err());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_bind_parameter_display() {
|
fn test_bind_parameter_display() {
|
||||||
// test quoted identifier properly escapes specific characters
|
// BindParameter displays quoted output
|
||||||
let got = format!("{}", BindParameter::Quoted("from".to_string()));
|
let got = format!("{}", BindParameter("from foo".into()));
|
||||||
assert_eq!(got, r#"$"from""#);
|
assert_eq!(got, r#"$"from foo""#);
|
||||||
|
|
||||||
// test unquoted identifier
|
// BindParameter displays quoted and escaped output
|
||||||
let got = format!("{}", BindParameter::Unquoted("quick_draw".to_string()));
|
let got = format!("{}", BindParameter("from\nfoo".into()));
|
||||||
|
assert_eq!(got, r#"$"from\nfoo""#);
|
||||||
|
|
||||||
|
// BindParameter displays unquoted output
|
||||||
|
let got = format!("{}", BindParameter("quick_draw".into()));
|
||||||
assert_eq!(got, "$quick_draw");
|
assert_eq!(got, "$quick_draw");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
use crate::internal::{expect, ParseResult};
|
||||||
|
use crate::show_measurements::show_measurements;
|
||||||
|
use crate::Statement;
|
||||||
|
use nom::bytes::complete::tag_no_case;
|
||||||
|
use nom::character::complete::multispace1;
|
||||||
|
use nom::combinator::map;
|
||||||
|
use nom::sequence::{pair, preceded};
|
||||||
|
|
||||||
|
/// Parse a SHOW statement.
|
||||||
|
pub fn show_statement(i: &str) -> ParseResult<&str, Statement> {
|
||||||
|
preceded(
|
||||||
|
pair(tag_no_case("SHOW"), multispace1),
|
||||||
|
expect(
|
||||||
|
"invalid SHOW statement, expected MEASUREMENTS",
|
||||||
|
// NOTE: This will become an alt(()) once more statements are added
|
||||||
|
map(show_measurements, |v| {
|
||||||
|
Statement::ShowMeasurements(Box::new(v))
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
)(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::*;
|
||||||
|
use crate::assert_expect_error;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_show_statement() {
|
||||||
|
let (_, got) = show_statement("SHOW MEASUREMENTS").unwrap();
|
||||||
|
assert_eq!(format!("{}", got), "SHOW MEASUREMENTS");
|
||||||
|
|
||||||
|
// Fallible case
|
||||||
|
|
||||||
|
// Unsupported SHOW
|
||||||
|
assert_expect_error!(
|
||||||
|
show_statement("SHOW TAG KEYS"),
|
||||||
|
"invalid SHOW statement, expected MEASUREMENTS"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -4,17 +4,18 @@
|
||||||
|
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
use crate::internal::{expect, ParseResult};
|
||||||
use nom::branch::alt;
|
use nom::branch::alt;
|
||||||
use nom::bytes::complete::{tag, tag_no_case};
|
use nom::bytes::complete::{tag, tag_no_case};
|
||||||
use nom::character::complete::{char, multispace1};
|
use nom::character::complete::{char, multispace1};
|
||||||
use nom::combinator::{map, opt, value};
|
use nom::combinator::{map, opt, value};
|
||||||
|
use nom::sequence::tuple;
|
||||||
use nom::sequence::{pair, preceded, terminated};
|
use nom::sequence::{pair, preceded, terminated};
|
||||||
use nom::{sequence::tuple, IResult};
|
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::fmt::Formatter;
|
use std::fmt::Formatter;
|
||||||
|
|
||||||
use crate::common::{
|
use crate::common::{
|
||||||
limit_clause, measurement_name_expression, offset_clause, statement_terminator, where_clause,
|
limit_clause, measurement_name_expression, offset_clause, where_clause,
|
||||||
MeasurementNameExpression,
|
MeasurementNameExpression,
|
||||||
};
|
};
|
||||||
use crate::expression::Expr;
|
use crate::expression::Expr;
|
||||||
|
@ -43,20 +44,23 @@ impl fmt::Display for OnExpression {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse the `ON` expression of the `SHOW MEASUREMENTS` statement.
|
/// Parse the `ON` expression of the `SHOW MEASUREMENTS` statement.
|
||||||
fn on_expression(i: &str) -> IResult<&str, OnExpression> {
|
fn on_expression(i: &str) -> ParseResult<&str, OnExpression> {
|
||||||
preceded(
|
preceded(
|
||||||
pair(tag_no_case("ON"), multispace1),
|
pair(tag_no_case("ON"), multispace1),
|
||||||
alt((
|
expect(
|
||||||
value(OnExpression::AllDatabasesAndRetentionPolicies, tag("*.*")),
|
"invalid ON clause, expected wildcard or identifier",
|
||||||
value(OnExpression::AllDatabases, tag("*")),
|
alt((
|
||||||
map(
|
value(OnExpression::AllDatabasesAndRetentionPolicies, tag("*.*")),
|
||||||
pair(opt(terminated(identifier, tag("."))), identifier),
|
value(OnExpression::AllDatabases, tag("*")),
|
||||||
|tup| match tup {
|
map(
|
||||||
(None, db) => OnExpression::Database(db),
|
pair(opt(terminated(identifier, tag("."))), identifier),
|
||||||
(Some(db), rp) => OnExpression::DatabaseRetentionPolicy(db, rp),
|
|tup| match tup {
|
||||||
},
|
(None, db) => OnExpression::Database(db),
|
||||||
),
|
(Some(db), rp) => OnExpression::DatabaseRetentionPolicy(db, rp),
|
||||||
)),
|
},
|
||||||
|
),
|
||||||
|
)),
|
||||||
|
),
|
||||||
)(i)
|
)(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -114,50 +118,63 @@ impl fmt::Display for MeasurementExpression {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn with_measurement_expression(i: &str) -> IResult<&str, MeasurementExpression> {
|
fn with_measurement_expression(i: &str) -> ParseResult<&str, MeasurementExpression> {
|
||||||
preceded(
|
preceded(
|
||||||
tuple((
|
tuple((
|
||||||
tag_no_case("with"),
|
tag_no_case("WITH"),
|
||||||
multispace1,
|
multispace1,
|
||||||
tag_no_case("measurement"),
|
expect(
|
||||||
multispace1,
|
"invalid WITH clause, expected MEASUREMENT",
|
||||||
)),
|
tag_no_case("measurement"),
|
||||||
alt((
|
|
||||||
map(
|
|
||||||
tuple((char('='), multispace1, measurement_name_expression)),
|
|
||||||
|(_, _, name)| MeasurementExpression::Equals(name),
|
|
||||||
),
|
),
|
||||||
map(tuple((tag("=~"), multispace1, regex)), |(_, _, regex)| {
|
multispace1,
|
||||||
MeasurementExpression::Regex(regex)
|
|
||||||
}),
|
|
||||||
)),
|
)),
|
||||||
|
expect(
|
||||||
|
"expected = or =~",
|
||||||
|
alt((
|
||||||
|
map(
|
||||||
|
tuple((
|
||||||
|
char('='),
|
||||||
|
multispace1,
|
||||||
|
expect(
|
||||||
|
"expected measurement name or wildcard",
|
||||||
|
measurement_name_expression,
|
||||||
|
),
|
||||||
|
)),
|
||||||
|
|(_, _, name)| MeasurementExpression::Equals(name),
|
||||||
|
),
|
||||||
|
map(
|
||||||
|
tuple((
|
||||||
|
tag("=~"),
|
||||||
|
multispace1,
|
||||||
|
expect("expected regex literal", regex),
|
||||||
|
)),
|
||||||
|
|(_, _, regex)| MeasurementExpression::Regex(regex),
|
||||||
|
),
|
||||||
|
)),
|
||||||
|
),
|
||||||
)(i)
|
)(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn show_measurements(i: &str) -> IResult<&str, ShowMeasurementsStatement> {
|
/// Parse a `SHOW MEASUREMENTS` statement after `SHOW` and any whitespace has been consumed.
|
||||||
|
pub fn show_measurements(i: &str) -> ParseResult<&str, ShowMeasurementsStatement> {
|
||||||
let (
|
let (
|
||||||
remaining_input,
|
remaining_input,
|
||||||
(
|
(
|
||||||
_, // "SHOW"
|
|
||||||
_, // <ws>
|
|
||||||
_, // "MEASUREMENTS"
|
_, // "MEASUREMENTS"
|
||||||
on_expression,
|
on_expression,
|
||||||
measurement_expression,
|
measurement_expression,
|
||||||
condition,
|
condition,
|
||||||
limit,
|
limit,
|
||||||
offset,
|
offset,
|
||||||
_, // ";"
|
|
||||||
),
|
),
|
||||||
) = tuple((
|
) = tuple((
|
||||||
tag_no_case("show"),
|
tag_no_case("MEASUREMENTS"),
|
||||||
multispace1,
|
|
||||||
tag_no_case("measurements"),
|
|
||||||
opt(preceded(multispace1, on_expression)),
|
opt(preceded(multispace1, on_expression)),
|
||||||
opt(preceded(multispace1, with_measurement_expression)),
|
opt(preceded(multispace1, with_measurement_expression)),
|
||||||
opt(preceded(multispace1, where_clause)),
|
opt(preceded(multispace1, where_clause)),
|
||||||
opt(preceded(multispace1, limit_clause)),
|
opt(preceded(multispace1, limit_clause)),
|
||||||
opt(preceded(multispace1, offset_clause)),
|
opt(preceded(multispace1, offset_clause)),
|
||||||
statement_terminator,
|
|
||||||
))(i)?;
|
))(i)?;
|
||||||
|
|
||||||
Ok((
|
Ok((
|
||||||
|
@ -175,10 +192,11 @@ pub fn show_measurements(i: &str) -> IResult<&str, ShowMeasurementsStatement> {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::assert_expect_error;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_show_measurements() {
|
fn test_show_measurements() {
|
||||||
let (_, got) = show_measurements("SHOW measurements;").unwrap();
|
let (_, got) = show_measurements("measurements").unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
got,
|
got,
|
||||||
ShowMeasurementsStatement {
|
ShowMeasurementsStatement {
|
||||||
|
@ -187,28 +205,28 @@ mod test {
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
let (_, got) = show_measurements("SHOW measurements ON foo;").unwrap();
|
let (_, got) = show_measurements("measurements ON foo").unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
got,
|
got,
|
||||||
ShowMeasurementsStatement {
|
ShowMeasurementsStatement {
|
||||||
on_expression: Some(OnExpression::Database(Identifier::Unquoted("foo".into()))),
|
on_expression: Some(OnExpression::Database("foo".into())),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
let (_, got) = show_measurements(
|
let (_, got) = show_measurements(
|
||||||
"SHOW\nMEASUREMENTS\tON foo WITH MEASUREMENT\n= bar WHERE\ntrue LIMIT 10 OFFSET 20;",
|
"MEASUREMENTS\tON foo WITH MEASUREMENT\n= bar WHERE\ntrue LIMIT 10 OFFSET 20",
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
got,
|
got,
|
||||||
ShowMeasurementsStatement {
|
ShowMeasurementsStatement {
|
||||||
on_expression: Some(OnExpression::Database(Identifier::Unquoted("foo".into()))),
|
on_expression: Some(OnExpression::Database("foo".into())),
|
||||||
measurement_expression: Some(MeasurementExpression::Equals(
|
measurement_expression: Some(MeasurementExpression::Equals(
|
||||||
MeasurementNameExpression {
|
MeasurementNameExpression {
|
||||||
database: None,
|
database: None,
|
||||||
retention_policy: None,
|
retention_policy: None,
|
||||||
name: Identifier::Unquoted("bar".into()),
|
name: "bar".into(),
|
||||||
}
|
}
|
||||||
)),
|
)),
|
||||||
condition: Some(Expr::Literal(true.into())),
|
condition: Some(Expr::Literal(true.into())),
|
||||||
|
@ -221,14 +239,13 @@ mod test {
|
||||||
"SHOW MEASUREMENTS ON foo WITH MEASUREMENT = bar WHERE true LIMIT 10 OFFSET 20"
|
"SHOW MEASUREMENTS ON foo WITH MEASUREMENT = bar WHERE true LIMIT 10 OFFSET 20"
|
||||||
);
|
);
|
||||||
|
|
||||||
let (_, got) = show_measurements(
|
let (_, got) =
|
||||||
"SHOW\nMEASUREMENTS\tON foo WITH MEASUREMENT\n=~ /bar/ WHERE\ntrue;",
|
show_measurements("MEASUREMENTS\tON foo WITH MEASUREMENT\n=~ /bar/ WHERE\ntrue")
|
||||||
)
|
.unwrap();
|
||||||
.unwrap();
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
got,
|
got,
|
||||||
ShowMeasurementsStatement {
|
ShowMeasurementsStatement {
|
||||||
on_expression: Some(OnExpression::Database(Identifier::Unquoted("foo".into()))),
|
on_expression: Some(OnExpression::Database("foo".into())),
|
||||||
measurement_expression: Some(MeasurementExpression::Regex(Regex("bar".into()))),
|
measurement_expression: Some(MeasurementExpression::Regex(Regex("bar".into()))),
|
||||||
condition: Some(Expr::Literal(true.into())),
|
condition: Some(Expr::Literal(true.into())),
|
||||||
limit: None,
|
limit: None,
|
||||||
|
@ -255,7 +272,7 @@ mod test {
|
||||||
let got = format!(
|
let got = format!(
|
||||||
"{}",
|
"{}",
|
||||||
ShowMeasurementsStatement {
|
ShowMeasurementsStatement {
|
||||||
on_expression: Some(OnExpression::Database(Identifier::Unquoted("foo".into()))),
|
on_expression: Some(OnExpression::Database("foo".into())),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
@ -265,8 +282,8 @@ mod test {
|
||||||
"{}",
|
"{}",
|
||||||
ShowMeasurementsStatement {
|
ShowMeasurementsStatement {
|
||||||
on_expression: Some(OnExpression::DatabaseRetentionPolicy(
|
on_expression: Some(OnExpression::DatabaseRetentionPolicy(
|
||||||
Identifier::Unquoted("foo".into()),
|
"foo".into(),
|
||||||
Identifier::Unquoted("bar".into())
|
"bar".into()
|
||||||
)),
|
)),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
}
|
}
|
||||||
|
@ -295,11 +312,12 @@ mod test {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_on_expression() {
|
fn test_on_expression() {
|
||||||
let (_, got) = on_expression("ON cpu").unwrap();
|
let (_, got) = on_expression("ON cpu").unwrap();
|
||||||
assert!(matches!(got, OnExpression::Database(Identifier::Unquoted(db)) if db == "cpu"));
|
assert_eq!(got, OnExpression::Database("cpu".into()));
|
||||||
|
|
||||||
let (_, got) = on_expression("ON cpu.autogen").unwrap();
|
let (_, got) = on_expression("ON cpu.autogen").unwrap();
|
||||||
assert!(
|
assert_eq!(
|
||||||
matches!(got, OnExpression::DatabaseRetentionPolicy(Identifier::Unquoted(db), Identifier::Unquoted(rp)) if db == "cpu" && rp == "autogen")
|
got,
|
||||||
|
OnExpression::DatabaseRetentionPolicy("cpu".into(), "autogen".into())
|
||||||
);
|
);
|
||||||
|
|
||||||
let (_, got) = on_expression("ON *").unwrap();
|
let (_, got) = on_expression("ON *").unwrap();
|
||||||
|
@ -310,6 +328,11 @@ mod test {
|
||||||
got,
|
got,
|
||||||
OnExpression::AllDatabasesAndRetentionPolicies
|
OnExpression::AllDatabasesAndRetentionPolicies
|
||||||
));
|
));
|
||||||
|
|
||||||
|
assert_expect_error!(
|
||||||
|
on_expression("ON WHERE cpu = 'test'"),
|
||||||
|
"invalid ON clause, expected wildcard or identifier"
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -320,7 +343,7 @@ mod test {
|
||||||
MeasurementExpression::Equals(MeasurementNameExpression {
|
MeasurementExpression::Equals(MeasurementNameExpression {
|
||||||
database: None,
|
database: None,
|
||||||
retention_policy: None,
|
retention_policy: None,
|
||||||
name: Identifier::Unquoted("foo".into())
|
name: "foo".into()
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -329,16 +352,34 @@ mod test {
|
||||||
|
|
||||||
// Fallible cases
|
// Fallible cases
|
||||||
|
|
||||||
|
// Missing MEASUREMENT token
|
||||||
|
assert_expect_error!(
|
||||||
|
with_measurement_expression("WITH =~ foo"),
|
||||||
|
"invalid WITH clause, expected MEASUREMENT"
|
||||||
|
);
|
||||||
|
|
||||||
// Must have a regex for equal regex operator
|
// Must have a regex for equal regex operator
|
||||||
with_measurement_expression("WITH measurement =~ foo").unwrap_err();
|
assert_expect_error!(
|
||||||
|
with_measurement_expression("WITH measurement =~ foo"),
|
||||||
|
"expected regex literal"
|
||||||
|
);
|
||||||
|
|
||||||
// Unsupported regex not equal operator
|
// Unsupported regex not equal operator
|
||||||
with_measurement_expression("WITH measurement !~ foo").unwrap_err();
|
assert_expect_error!(
|
||||||
|
with_measurement_expression("WITH measurement !~ foo"),
|
||||||
|
"expected = or =~"
|
||||||
|
);
|
||||||
|
|
||||||
// Must have an identifier for equal operator
|
// Must have an identifier for equal operator
|
||||||
with_measurement_expression("WITH measurement = /foo/").unwrap_err();
|
assert_expect_error!(
|
||||||
|
with_measurement_expression("WITH measurement = /foo/"),
|
||||||
|
"expected measurement name or wildcard"
|
||||||
|
);
|
||||||
|
|
||||||
// Must have an identifier
|
// Must have an identifier
|
||||||
with_measurement_expression("WITH measurement = 1").unwrap_err();
|
assert_expect_error!(
|
||||||
|
with_measurement_expression("WITH measurement = 1"),
|
||||||
|
"expected measurement name or wildcard"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
use crate::internal::ParseResult;
|
||||||
|
use crate::show::show_statement;
|
||||||
|
use crate::show_measurements::ShowMeasurementsStatement;
|
||||||
|
use std::fmt::{Display, Formatter};
|
||||||
|
|
||||||
|
/// An InfluxQL statement.
|
||||||
|
#[derive(Debug, Clone, PartialEq)]
|
||||||
|
pub enum Statement {
|
||||||
|
/// Represents a `SHOW MEASUREMENTS` statement.
|
||||||
|
ShowMeasurements(Box<ShowMeasurementsStatement>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for Statement {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::ShowMeasurements(s) => write!(f, "{}", s)?,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse a single InfluxQL statement.
|
||||||
|
pub fn statement(i: &str) -> ParseResult<&str, Statement> {
|
||||||
|
// NOTE: This will become an alt(()) once more statements are added
|
||||||
|
show_statement(i)
|
||||||
|
}
|
|
@ -6,6 +6,7 @@
|
||||||
// Taken liberally from https://github.com/Geal/nom/blob/main/examples/string.rs and
|
// Taken liberally from https://github.com/Geal/nom/blob/main/examples/string.rs and
|
||||||
// amended for InfluxQL.
|
// amended for InfluxQL.
|
||||||
|
|
||||||
|
use crate::internal::{expect, ParseError, ParseResult};
|
||||||
use nom::branch::alt;
|
use nom::branch::alt;
|
||||||
use nom::bytes::complete::{is_not, tag};
|
use nom::bytes::complete::{is_not, tag};
|
||||||
use nom::character::complete::char;
|
use nom::character::complete::char;
|
||||||
|
@ -13,23 +14,45 @@ use nom::combinator::{map, value, verify};
|
||||||
use nom::error::Error;
|
use nom::error::Error;
|
||||||
use nom::multi::fold_many0;
|
use nom::multi::fold_many0;
|
||||||
use nom::sequence::{delimited, preceded};
|
use nom::sequence::{delimited, preceded};
|
||||||
use nom::{IResult, Parser};
|
use nom::Parser;
|
||||||
use std::fmt::{Display, Formatter, Write};
|
use std::fmt::{Display, Formatter, Write};
|
||||||
|
|
||||||
/// Writes `s` to `f`, mapping any characters from => to their escaped equivalents.
|
/// Writes `S` to `F`, mapping any characters `FROM` => `TO` their escaped equivalents.
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! write_escaped {
|
macro_rules! write_escaped {
|
||||||
($f: expr, $s: expr $(, $from:expr => $to:expr)+) => {
|
($F: expr, $STRING: expr $(, $FROM:expr => $TO:expr)+) => {
|
||||||
for c in $s.chars() {
|
for c in $STRING.chars() {
|
||||||
match c {
|
match c {
|
||||||
$(
|
$(
|
||||||
$from => $f.write_str($to)?,
|
$FROM => $F.write_str($TO)?,
|
||||||
)+
|
)+
|
||||||
_ => $f.write_char(c)?,
|
_ => $F.write_char(c)?,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
/// Writes `S` to `F`, optionally surrounding in `QUOTE`s, if FN(S) fails,
|
||||||
|
/// and mapping any characters `FROM` => `TO` their escaped equivalents.
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! write_quoted_string {
|
||||||
|
($F: expr, $QUOTE: literal, $STRING: expr, $FN: expr $(, $FROM:expr => $TO:expr)+) => {
|
||||||
|
if nom::sequence::terminated($FN, nom::combinator::eof)($STRING).is_ok() {
|
||||||
|
$F.write_str($STRING)?;
|
||||||
|
} else {
|
||||||
|
// must be escaped
|
||||||
|
$F.write_char($QUOTE)?;
|
||||||
|
for c in $STRING.chars() {
|
||||||
|
match c {
|
||||||
|
$(
|
||||||
|
$FROM => $F.write_str($TO)?,
|
||||||
|
)+
|
||||||
|
_ => $F.write_char(c)?,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
$F.write_char($QUOTE)?;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/// A string fragment contains a fragment of a string being parsed: either
|
/// A string fragment contains a fragment of a string being parsed: either
|
||||||
/// a non-empty Literal (a series of non-escaped characters) or a single.
|
/// a non-empty Literal (a series of non-escaped characters) or a single.
|
||||||
|
@ -40,41 +63,51 @@ enum StringFragment<'a> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse a single-quoted literal string.
|
/// Parse a single-quoted literal string.
|
||||||
pub fn single_quoted_string(i: &str) -> IResult<&str, String> {
|
pub fn single_quoted_string(i: &str) -> ParseResult<&str, String> {
|
||||||
let escaped = preceded(
|
let escaped = preceded(
|
||||||
char('\\'),
|
char('\\'),
|
||||||
alt((char('\\'), char('\''), value('\n', char('n')))),
|
expect(
|
||||||
|
r#"invalid escape sequence, expected \\, \' or \n"#,
|
||||||
|
alt((char('\\'), char('\''), value('\n', char('n')))),
|
||||||
|
),
|
||||||
);
|
);
|
||||||
|
|
||||||
string(
|
string(
|
||||||
'\'',
|
'\'',
|
||||||
|
"unterminated string literal",
|
||||||
verify(is_not("'\\\n"), |s: &str| !s.is_empty()),
|
verify(is_not("'\\\n"), |s: &str| !s.is_empty()),
|
||||||
escaped,
|
escaped,
|
||||||
)(i)
|
)(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse a double-quoted identifier string.
|
/// Parse a double-quoted identifier string.
|
||||||
pub fn double_quoted_string(i: &str) -> IResult<&str, String> {
|
pub fn double_quoted_string(i: &str) -> ParseResult<&str, String> {
|
||||||
let escaped = preceded(
|
let escaped = preceded(
|
||||||
char('\\'),
|
char('\\'),
|
||||||
alt((char('\\'), char('"'), value('\n', char('n')))),
|
expect(
|
||||||
|
r#"invalid escape sequence, expected \\, \" or \n"#,
|
||||||
|
alt((char('\\'), char('"'), value('\n', char('n')))),
|
||||||
|
),
|
||||||
);
|
);
|
||||||
|
|
||||||
string(
|
string(
|
||||||
'"',
|
'"',
|
||||||
|
"unterminated string literal",
|
||||||
verify(is_not("\"\\\n"), |s: &str| !s.is_empty()),
|
verify(is_not("\"\\\n"), |s: &str| !s.is_empty()),
|
||||||
escaped,
|
escaped,
|
||||||
)(i)
|
)(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn string<'a, T, U>(
|
fn string<'a, T, U, E>(
|
||||||
delimiter: char,
|
delimiter: char,
|
||||||
|
unterminated_message: &'static str,
|
||||||
literal: T,
|
literal: T,
|
||||||
escaped: U,
|
escaped: U,
|
||||||
) -> impl FnMut(&'a str) -> IResult<&'a str, String>
|
) -> impl FnMut(&'a str) -> ParseResult<&'a str, String, E>
|
||||||
where
|
where
|
||||||
T: Parser<&'a str, &'a str, Error<&'a str>>,
|
T: Parser<&'a str, &'a str, E>,
|
||||||
U: Parser<&'a str, char, Error<&'a str>>,
|
U: Parser<&'a str, char, E>,
|
||||||
|
E: ParseError<'a>,
|
||||||
{
|
{
|
||||||
let fragment = alt((
|
let fragment = alt((
|
||||||
map(literal, StringFragment::Literal),
|
map(literal, StringFragment::Literal),
|
||||||
|
@ -89,13 +122,17 @@ where
|
||||||
string
|
string
|
||||||
});
|
});
|
||||||
|
|
||||||
delimited(char(delimiter), build_string, char(delimiter))
|
delimited(
|
||||||
|
char(delimiter),
|
||||||
|
build_string,
|
||||||
|
expect(unterminated_message, char(delimiter)),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse regular expression literal characters.
|
/// Parse regular expression literal characters.
|
||||||
///
|
///
|
||||||
/// Consumes i until reaching and escaped delimiter ("\/"), newline or eof.
|
/// Consumes i until reaching and escaped delimiter ("\/"), newline or eof.
|
||||||
fn regex_literal(i: &str) -> IResult<&str, &str> {
|
fn regex_literal(i: &str) -> ParseResult<&str, &str> {
|
||||||
let mut remaining = &i[..i.len()];
|
let mut remaining = &i[..i.len()];
|
||||||
let mut consumed = &i[..0];
|
let mut consumed = &i[..0];
|
||||||
|
|
||||||
|
@ -123,7 +160,7 @@ fn regex_literal(i: &str) -> IResult<&str, &str> {
|
||||||
|
|
||||||
/// An unescaped regular expression.
|
/// An unescaped regular expression.
|
||||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||||
pub struct Regex(pub(crate) String);
|
pub struct Regex(pub String);
|
||||||
|
|
||||||
impl Display for Regex {
|
impl Display for Regex {
|
||||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
@ -147,13 +184,22 @@ impl From<&str> for Regex {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse a regular expression, delimited by `/`.
|
/// Parse a regular expression, delimited by `/`.
|
||||||
pub fn regex(i: &str) -> IResult<&str, Regex> {
|
pub fn regex(i: &str) -> ParseResult<&str, Regex> {
|
||||||
map(string('/', regex_literal, map(tag("\\/"), |_| '/')), Regex)(i)
|
map(
|
||||||
|
string(
|
||||||
|
'/',
|
||||||
|
"unterminated regex literal",
|
||||||
|
regex_literal,
|
||||||
|
map(tag("\\/"), |_| '/'),
|
||||||
|
),
|
||||||
|
Regex,
|
||||||
|
)(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::assert_expect_error;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_double_quoted_string() {
|
fn test_double_quoted_string() {
|
||||||
|
@ -180,18 +226,32 @@ mod test {
|
||||||
let (_, got) = double_quoted_string("\"quick\rdraw\"").unwrap();
|
let (_, got) = double_quoted_string("\"quick\rdraw\"").unwrap();
|
||||||
assert_eq!(got, "quick\rdraw");
|
assert_eq!(got, "quick\rdraw");
|
||||||
|
|
||||||
|
// Empty string
|
||||||
|
let (i, got) = double_quoted_string("\"\"").unwrap();
|
||||||
|
assert_eq!(i, "");
|
||||||
|
assert_eq!(got, "");
|
||||||
|
|
||||||
// ┌─────────────────────────────┐
|
// ┌─────────────────────────────┐
|
||||||
// │ Fallible tests │
|
// │ Fallible tests │
|
||||||
// └─────────────────────────────┘
|
// └─────────────────────────────┘
|
||||||
|
|
||||||
// Not terminated
|
// Not terminated
|
||||||
double_quoted_string(r#""quick draw"#).unwrap_err();
|
assert_expect_error!(
|
||||||
|
double_quoted_string(r#""quick draw"#),
|
||||||
|
"unterminated string literal"
|
||||||
|
);
|
||||||
|
|
||||||
// Literal newline
|
// Literal newline
|
||||||
double_quoted_string("\"quick\ndraw\"").unwrap_err();
|
assert_expect_error!(
|
||||||
|
double_quoted_string("\"quick\ndraw\""),
|
||||||
|
"unterminated string literal"
|
||||||
|
);
|
||||||
|
|
||||||
// Invalid escape
|
// Invalid escape
|
||||||
double_quoted_string(r#""quick\idraw""#).unwrap_err();
|
assert_expect_error!(
|
||||||
|
double_quoted_string(r#""quick\idraw""#),
|
||||||
|
r#"invalid escape sequence, expected \\, \" or \n"#
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -219,15 +279,25 @@ mod test {
|
||||||
let (_, got) = single_quoted_string("'quick\rdraw'").unwrap();
|
let (_, got) = single_quoted_string("'quick\rdraw'").unwrap();
|
||||||
assert_eq!(got, "quick\rdraw");
|
assert_eq!(got, "quick\rdraw");
|
||||||
|
|
||||||
|
// Empty string
|
||||||
|
let (i, got) = single_quoted_string("''").unwrap();
|
||||||
|
assert_eq!(i, "");
|
||||||
|
assert_eq!(got, "");
|
||||||
|
|
||||||
// ┌─────────────────────────────┐
|
// ┌─────────────────────────────┐
|
||||||
// │ Fallible tests │
|
// │ Fallible tests │
|
||||||
// └─────────────────────────────┘
|
// └─────────────────────────────┘
|
||||||
|
|
||||||
// Not terminated
|
// Not terminated
|
||||||
single_quoted_string(r#"'quick draw"#).unwrap_err();
|
assert_expect_error!(
|
||||||
|
single_quoted_string(r#"'quick draw"#),
|
||||||
|
"unterminated string literal"
|
||||||
|
);
|
||||||
|
|
||||||
// Invalid escape
|
// Invalid escape
|
||||||
single_quoted_string(r#"'quick\idraw'"#).unwrap_err();
|
assert_expect_error!(
|
||||||
|
single_quoted_string(r#"'quick\idraw'"#),
|
||||||
|
r#"invalid escape sequence, expected \\, \' or \n"#
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -244,19 +314,20 @@ mod test {
|
||||||
assert_eq!(got, "hello\\n".into());
|
assert_eq!(got, "hello\\n".into());
|
||||||
|
|
||||||
// Empty regex
|
// Empty regex
|
||||||
let (_, got) = regex("//").unwrap();
|
let (i, got) = regex("//").unwrap();
|
||||||
|
assert_eq!(i, "");
|
||||||
assert_eq!(got, "".into());
|
assert_eq!(got, "".into());
|
||||||
|
|
||||||
// Fallible cases
|
// Fallible cases
|
||||||
|
|
||||||
// Missing trailing delimiter
|
// Missing trailing delimiter
|
||||||
regex(r#"/hello"#).unwrap_err();
|
assert_expect_error!(regex(r#"/hello"#), "unterminated regex literal");
|
||||||
|
|
||||||
// Embedded newline
|
// Embedded newline
|
||||||
regex("/hello\nworld").unwrap_err();
|
assert_expect_error!(regex("/hello\nworld/"), "unterminated regex literal");
|
||||||
|
|
||||||
// Single backslash fails, which matches Go implementation
|
// Single backslash fails, which matches Go implementation
|
||||||
// See: https://go.dev/play/p/_8J1v5-382G
|
// See: https://go.dev/play/p/_8J1v5-382G
|
||||||
regex(r#"/\/"#).unwrap_err();
|
assert_expect_error!(regex(r#"/\/"#), "unterminated regex literal");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,3 +7,19 @@ macro_rules! assert_failure {
|
||||||
assert!(matches!($RESULT.unwrap_err(), nom::Err::Failure(_)));
|
assert!(matches!($RESULT.unwrap_err(), nom::Err::Failure(_)));
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Asserts that the result of a nom parser is an [`crate::internal::Error::Syntax`] and a [`nom::Err::Failure`].
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! assert_expect_error {
|
||||||
|
($RESULT:expr, $MSG:expr) => {
|
||||||
|
match $RESULT.unwrap_err() {
|
||||||
|
nom::Err::Failure($crate::internal::Error::Syntax {
|
||||||
|
input: _,
|
||||||
|
message: got,
|
||||||
|
}) => {
|
||||||
|
assert_eq!(format!("{}", got), $MSG)
|
||||||
|
}
|
||||||
|
e => panic!("Expected Failure(Syntax(_, msg), got {:?}", e),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
|
@ -504,7 +504,7 @@ impl ChunkAdapter {
|
||||||
|
|
||||||
// calculate sort key
|
// calculate sort key
|
||||||
let pk_cols = schema.primary_key();
|
let pk_cols = schema.primary_key();
|
||||||
let sort_key = partition_sort_key_ref.filter_to(&pk_cols);
|
let sort_key = partition_sort_key_ref.filter_to(&pk_cols, parquet_file.partition_id.get());
|
||||||
assert!(
|
assert!(
|
||||||
!sort_key.is_empty(),
|
!sort_key.is_empty(),
|
||||||
"Sort key can never be empty because there should at least be a time column",
|
"Sort key can never be empty because there should at least be a time column",
|
||||||
|
|
|
@ -209,15 +209,15 @@ impl SortKey {
|
||||||
/// # Panics
|
/// # Panics
|
||||||
///
|
///
|
||||||
/// Panics if any columns in the primary key are NOT present in this sort key.
|
/// Panics if any columns in the primary key are NOT present in this sort key.
|
||||||
pub fn filter_to(&self, primary_key: &[&str]) -> SortKey {
|
pub fn filter_to(&self, primary_key: &[&str], partition_id: i64) -> SortKey {
|
||||||
let missing_from_catalog_key: Vec<_> = primary_key
|
let missing_from_catalog_key: Vec<_> = primary_key
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|col| !self.contains(col))
|
.filter(|col| !self.contains(col))
|
||||||
.collect();
|
.collect();
|
||||||
if !missing_from_catalog_key.is_empty() {
|
if !missing_from_catalog_key.is_empty() {
|
||||||
panic!(
|
panic!(
|
||||||
"Primary key column(s) found that don't appear in the catalog sort key: [{:?}]",
|
"Primary key column(s) found that don't appear in the catalog sort key [{:?}] of partition: {}. Sort key: {:?}",
|
||||||
missing_from_catalog_key
|
missing_from_catalog_key, partition_id, self
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -914,7 +914,7 @@ mod tests {
|
||||||
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
|
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
|
||||||
let data_primary_key = ["host", "env", "time"];
|
let data_primary_key = ["host", "env", "time"];
|
||||||
|
|
||||||
let filtered = catalog_sort_key.filter_to(&data_primary_key);
|
let filtered = catalog_sort_key.filter_to(&data_primary_key, 1);
|
||||||
assert_eq!(catalog_sort_key, filtered);
|
assert_eq!(catalog_sort_key, filtered);
|
||||||
|
|
||||||
// If the catalog sort key contains more columns than the primary key, the filtered key
|
// If the catalog sort key contains more columns than the primary key, the filtered key
|
||||||
|
@ -922,7 +922,7 @@ mod tests {
|
||||||
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
|
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
|
||||||
let data_primary_key = ["host", "time"];
|
let data_primary_key = ["host", "time"];
|
||||||
|
|
||||||
let filtered = catalog_sort_key.filter_to(&data_primary_key);
|
let filtered = catalog_sort_key.filter_to(&data_primary_key, 1);
|
||||||
let expected = SortKey::from_columns(["host", "time"]);
|
let expected = SortKey::from_columns(["host", "time"]);
|
||||||
assert_eq!(expected, filtered);
|
assert_eq!(expected, filtered);
|
||||||
|
|
||||||
|
@ -931,7 +931,7 @@ mod tests {
|
||||||
let catalog_sort_key = SortKey::from_columns(["host", "env", "zone", "time"]);
|
let catalog_sort_key = SortKey::from_columns(["host", "env", "zone", "time"]);
|
||||||
let data_primary_key = ["env", "host", "time"];
|
let data_primary_key = ["env", "host", "time"];
|
||||||
|
|
||||||
let filtered = catalog_sort_key.filter_to(&data_primary_key);
|
let filtered = catalog_sort_key.filter_to(&data_primary_key, 1);
|
||||||
let expected = SortKey::from_columns(["host", "env", "time"]);
|
let expected = SortKey::from_columns(["host", "env", "time"]);
|
||||||
assert_eq!(expected, filtered);
|
assert_eq!(expected, filtered);
|
||||||
}
|
}
|
||||||
|
@ -946,7 +946,7 @@ mod tests {
|
||||||
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
|
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
|
||||||
let data_primary_key = ["host", "env", "zone", "time"];
|
let data_primary_key = ["host", "env", "zone", "time"];
|
||||||
|
|
||||||
catalog_sort_key.filter_to(&data_primary_key);
|
catalog_sort_key.filter_to(&data_primary_key, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
Loading…
Reference in New Issue