parent
f03e370ecc
commit
fbe3e360d2
|
@ -870,6 +870,17 @@ pub struct PartitionParam {
|
|||
pub table_id: TableId,
|
||||
}
|
||||
|
||||
/// Data recorded when compaction skips a partition.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::FromRow)]
|
||||
pub struct SkippedCompaction {
|
||||
/// the partition
|
||||
pub partition_id: PartitionId,
|
||||
/// the reason compaction was skipped
|
||||
pub reason: String,
|
||||
/// when compaction was skipped
|
||||
pub skipped_at: Timestamp,
|
||||
}
|
||||
|
||||
/// Data object for a tombstone.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, sqlx::FromRow)]
|
||||
pub struct Tombstone {
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
CREATE TABLE IF NOT EXISTS skipped_compactions (
|
||||
partition_id BIGINT REFERENCES PARTITION (id) ON DELETE CASCADE,
|
||||
reason TEXT NOT NULL,
|
||||
skipped_at BIGINT NOT NULL,
|
||||
PRIMARY KEY (partition_id)
|
||||
);
|
|
@ -5,8 +5,8 @@ use data_types::{
|
|||
Column, ColumnSchema, ColumnType, ColumnTypeCount, Namespace, NamespaceId, NamespaceSchema,
|
||||
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
|
||||
PartitionKey, PartitionParam, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber,
|
||||
Shard, ShardId, ShardIndex, Table, TableId, TablePartition, TableSchema, Timestamp, Tombstone,
|
||||
TombstoneId, TopicId, TopicMetadata,
|
||||
Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId, TablePartition, TableSchema,
|
||||
Timestamp, Tombstone, TombstoneId, TopicId, TopicMetadata,
|
||||
};
|
||||
use iox_time::TimeProvider;
|
||||
use snafu::{OptionExt, Snafu};
|
||||
|
@ -20,6 +20,7 @@ use uuid::Uuid;
|
|||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[allow(missing_copy_implementations, missing_docs)]
|
||||
#[snafu(visibility(pub(crate)))]
|
||||
pub enum Error {
|
||||
#[snafu(display("name {} already exists", name))]
|
||||
NameExists { name: String },
|
||||
|
@ -115,6 +116,17 @@ pub enum Error {
|
|||
|
||||
#[snafu(display("database setup error: {}", source))]
|
||||
Setup { source: sqlx::Error },
|
||||
|
||||
#[snafu(display(
|
||||
"could not record a skipped compaction for partition {partition_id}: {source}"
|
||||
))]
|
||||
CouldNotRecordSkippedCompaction {
|
||||
source: sqlx::Error,
|
||||
partition_id: PartitionId,
|
||||
},
|
||||
|
||||
#[snafu(display("could not list skipped compactions: {source}"))]
|
||||
CouldNotListSkippedCompactions { source: sqlx::Error },
|
||||
}
|
||||
|
||||
/// A specialized `Error` for Catalog errors
|
||||
|
@ -128,9 +140,10 @@ pub trait Catalog: Send + Sync + Debug {
|
|||
|
||||
/// Creates a new [`Transaction`].
|
||||
///
|
||||
/// Creating transactions is potentially expensive. Holding one consumes resources. The number of parallel active
|
||||
/// transactions might be limited per catalog, so you MUST NOT rely on the ability to create multiple transactions in
|
||||
/// parallel for correctness. Parallel transactions must only be used for scaling.
|
||||
/// Creating transactions is potentially expensive. Holding one consumes resources. The number
|
||||
/// of parallel active transactions might be limited per catalog, so you MUST NOT rely on the
|
||||
/// ability to create multiple transactions in parallel for correctness. Parallel transactions
|
||||
/// must only be used for scaling.
|
||||
async fn start_transaction(&self) -> Result<Box<dyn Transaction>, Error>;
|
||||
|
||||
/// Accesses the repositories without a transaction scope.
|
||||
|
@ -152,10 +165,10 @@ pub(crate) mod sealed {
|
|||
|
||||
/// Helper trait to implement commit and abort of an transaction.
|
||||
///
|
||||
/// The problem is that both methods cannot take `self` directly, otherwise the [`Transaction`] would not be object
|
||||
/// safe. Therefore we can only take a reference. To avoid that a user uses a transaction after calling one of the
|
||||
/// finalizers, we use a tiny trick and take `Box<dyn Transaction>` in our public interface and use a sealed trait
|
||||
/// for the actual implementation.
|
||||
/// The problem is that both methods cannot take `self` directly, otherwise the [`Transaction`]
|
||||
/// would not be object safe. Therefore we can only take a reference. To avoid that a user uses
|
||||
/// a transaction after calling one of the finalizers, we use a tiny trick and take `Box<dyn
|
||||
/// Transaction>` in our public interface and use a sealed trait for the actual implementation.
|
||||
#[async_trait]
|
||||
pub trait TransactionFinalize: Send + Sync + Debug {
|
||||
async fn commit_inplace(&mut self) -> Result<(), Error>;
|
||||
|
@ -170,15 +183,17 @@ pub(crate) mod sealed {
|
|||
///
|
||||
/// Repositories can cheaply be borrowed from it.
|
||||
///
|
||||
/// Note that after any method in this transaction (including all repositories derived from it) returns an error, the
|
||||
/// transaction MIGHT be poisoned and will return errors for all operations, depending on the backend.
|
||||
/// Note that after any method in this transaction (including all repositories derived from it)
|
||||
/// returns an error, the transaction MIGHT be poisoned and will return errors for all operations,
|
||||
/// depending on the backend.
|
||||
///
|
||||
///
|
||||
/// # Drop
|
||||
///
|
||||
/// Dropping a transaction without calling [`commit`](Self::commit) or [`abort`](Self::abort) will abort the
|
||||
/// transaction. However resources might not be released immediately, so it is adviced to always call
|
||||
/// [`abort`](Self::abort) when you want to enforce that. Dropping w/o commiting/aborting will also log a warning.
|
||||
/// Dropping a transaction without calling [`commit`](Self::commit) or [`abort`](Self::abort) will
|
||||
/// abort the transaction. However resources might not be released immediately, so it is adviced to
|
||||
/// always call [`abort`](Self::abort) when you want to enforce that. Dropping w/o
|
||||
/// commiting/aborting will also log a warning.
|
||||
#[async_trait]
|
||||
pub trait Transaction: Send + Sync + Debug + sealed::TransactionFinalize + RepoCollection {
|
||||
/// Commits a transaction.
|
||||
|
@ -187,9 +202,9 @@ pub trait Transaction: Send + Sync + Debug + sealed::TransactionFinalize + RepoC
|
|||
///
|
||||
/// If successful, all changes will be visible to other transactions.
|
||||
///
|
||||
/// If an error is returned, the transaction may or or not be committed. This might be due to IO errors after the
|
||||
/// transaction was finished. However in either case, the transaction is atomic and can only succeed or fail
|
||||
/// entirely.
|
||||
/// If an error is returned, the transaction may or or not be committed. This might be due to
|
||||
/// IO errors after the transaction was finished. However in either case, the transaction is
|
||||
/// atomic and can only succeed or fail entirely.
|
||||
async fn commit(mut self: Box<Self>) -> Result<(), Error> {
|
||||
self.commit_inplace().await
|
||||
}
|
||||
|
@ -455,6 +470,17 @@ pub trait PartitionRepo: Send + Sync {
|
|||
partition_id: PartitionId,
|
||||
sort_key: &[&str],
|
||||
) -> Result<Partition>;
|
||||
|
||||
/// Record an instance of a partition being selected for compaction but compaction was not
|
||||
/// completed for the specified reason.
|
||||
async fn record_skipped_compaction(
|
||||
&mut self,
|
||||
partition_id: PartitionId,
|
||||
reason: &str,
|
||||
) -> Result<()>;
|
||||
|
||||
/// List the records of compacting a partition being skipped. This is mostly useful for testing.
|
||||
async fn list_skipped_compactions(&mut self) -> Result<Vec<SkippedCompaction>>;
|
||||
}
|
||||
|
||||
/// Functions for working with tombstones in the catalog
|
||||
|
@ -1212,7 +1238,8 @@ pub(crate) mod test_helpers {
|
|||
assert!(c.id > ColumnId::new(0));
|
||||
assert_eq!(c, cc);
|
||||
|
||||
// test that attempting to create an already defined column of a different type returns error
|
||||
// test that attempting to create an already defined column of a different type returns
|
||||
// error
|
||||
let err = repos
|
||||
.columns()
|
||||
.create_or_get("column_test", table.id, ColumnType::U64)
|
||||
|
@ -1555,6 +1582,35 @@ pub(crate) mod test_helpers {
|
|||
updated_other_partition.sort_key,
|
||||
vec!["tag2", "tag1", "tag3 , with comma", "time"]
|
||||
);
|
||||
|
||||
// The compactor can log why compaction was skipped
|
||||
let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap();
|
||||
assert!(
|
||||
skipped_compactions.is_empty(),
|
||||
"Expected no skipped compactions, got: {skipped_compactions:?}"
|
||||
);
|
||||
repos
|
||||
.partitions()
|
||||
.record_skipped_compaction(other_partition.id, "I am le tired")
|
||||
.await
|
||||
.unwrap();
|
||||
let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap();
|
||||
assert_eq!(skipped_compactions.len(), 1);
|
||||
assert_eq!(skipped_compactions[0].partition_id, other_partition.id);
|
||||
assert_eq!(skipped_compactions[0].reason, "I am le tired");
|
||||
|
||||
// Only save the last reason that any particular partition was skipped (really if the
|
||||
// partition appears in the skipped compactions, it shouldn't become a compaction candidate
|
||||
// again, but race conditions and all that)
|
||||
repos
|
||||
.partitions()
|
||||
.record_skipped_compaction(other_partition.id, "I'm on fire")
|
||||
.await
|
||||
.unwrap();
|
||||
let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap();
|
||||
assert_eq!(skipped_compactions.len(), 1);
|
||||
assert_eq!(skipped_compactions[0].partition_id, other_partition.id);
|
||||
assert_eq!(skipped_compactions[0].reason, "I'm on fire");
|
||||
}
|
||||
|
||||
async fn test_tombstone(catalog: Arc<dyn Catalog>) {
|
||||
|
|
|
@ -15,8 +15,8 @@ use data_types::{
|
|||
Column, ColumnId, ColumnType, ColumnTypeCount, CompactionLevel, Namespace, NamespaceId,
|
||||
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
|
||||
PartitionKey, PartitionParam, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber,
|
||||
Shard, ShardId, ShardIndex, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
|
||||
TopicId, TopicMetadata,
|
||||
Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId, TablePartition, Timestamp,
|
||||
Tombstone, TombstoneId, TopicId, TopicMetadata,
|
||||
};
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
use observability_deps::tracing::warn;
|
||||
|
@ -63,6 +63,7 @@ struct MemCollections {
|
|||
columns: Vec<Column>,
|
||||
shards: Vec<Shard>,
|
||||
partitions: Vec<Partition>,
|
||||
skipped_compactions: Vec<SkippedCompaction>,
|
||||
tombstones: Vec<Tombstone>,
|
||||
parquet_files: Vec<ParquetFile>,
|
||||
processed_tombstones: Vec<ProcessedTombstone>,
|
||||
|
@ -844,6 +845,38 @@ impl PartitionRepo for MemTxn {
|
|||
None => Err(Error::PartitionNotFound { id: partition_id }),
|
||||
}
|
||||
}
|
||||
|
||||
async fn record_skipped_compaction(
|
||||
&mut self,
|
||||
partition_id: PartitionId,
|
||||
reason: &str,
|
||||
) -> Result<()> {
|
||||
let reason = reason.to_string();
|
||||
let skipped_at = Timestamp::new(self.time_provider.now().timestamp_nanos());
|
||||
|
||||
let stage = self.stage();
|
||||
match stage
|
||||
.skipped_compactions
|
||||
.iter_mut()
|
||||
.find(|s| s.partition_id == partition_id)
|
||||
{
|
||||
Some(s) => {
|
||||
s.reason = reason;
|
||||
s.skipped_at = skipped_at;
|
||||
}
|
||||
None => stage.skipped_compactions.push(SkippedCompaction {
|
||||
partition_id,
|
||||
reason,
|
||||
skipped_at,
|
||||
}),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn list_skipped_compactions(&mut self) -> Result<Vec<SkippedCompaction>> {
|
||||
let stage = self.stage();
|
||||
Ok(stage.skipped_compactions.clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
|
|
@ -9,8 +9,9 @@ use async_trait::async_trait;
|
|||
use data_types::{
|
||||
Column, ColumnType, ColumnTypeCount, Namespace, NamespaceId, ParquetFile, ParquetFileId,
|
||||
ParquetFileParams, Partition, PartitionId, PartitionInfo, PartitionKey, PartitionParam,
|
||||
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, Table,
|
||||
TableId, TablePartition, Timestamp, Tombstone, TombstoneId, TopicId, TopicMetadata,
|
||||
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex,
|
||||
SkippedCompaction, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId, TopicId,
|
||||
TopicMetadata,
|
||||
};
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
use metric::{DurationHistogram, Metric};
|
||||
|
@ -246,6 +247,8 @@ decorate!(
|
|||
"partition_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>>;
|
||||
"partition_partition_info_by_id" = partition_info_by_id(&mut self, partition_id: PartitionId) -> Result<Option<PartitionInfo>>;
|
||||
"partition_update_sort_key" = update_sort_key(&mut self, partition_id: PartitionId, sort_key: &[&str]) -> Result<Partition>;
|
||||
"partition_record_skipped_compaction" = record_skipped_compaction(&mut self, partition_id: PartitionId, reason: &str) -> Result<()>;
|
||||
"partition_list_skipped_compactions" = list_skipped_compactions(&mut self) -> Result<Vec<SkippedCompaction>>;
|
||||
]
|
||||
);
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
use crate::{
|
||||
interface::{
|
||||
sealed::TransactionFinalize, Catalog, ColumnRepo, ColumnUpsertRequest, Error,
|
||||
self, sealed::TransactionFinalize, Catalog, ColumnRepo, ColumnUpsertRequest, Error,
|
||||
NamespaceRepo, ParquetFileRepo, PartitionRepo, ProcessedTombstoneRepo, QueryPoolRepo,
|
||||
RepoCollection, Result, ShardRepo, TablePersistInfo, TableRepo, TombstoneRepo,
|
||||
TopicMetadataRepo, Transaction,
|
||||
|
@ -14,11 +14,12 @@ use data_types::{
|
|||
Column, ColumnType, ColumnTypeCount, CompactionLevel, Namespace, NamespaceId, ParquetFile,
|
||||
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo, PartitionKey,
|
||||
PartitionParam, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId,
|
||||
ShardIndex, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId, TopicId,
|
||||
TopicMetadata,
|
||||
ShardIndex, SkippedCompaction, Table, TableId, TablePartition, Timestamp, Tombstone,
|
||||
TombstoneId, TopicId, TopicMetadata,
|
||||
};
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
use observability_deps::tracing::{debug, info, warn};
|
||||
use snafu::prelude::*;
|
||||
use sqlx::{
|
||||
migrate::Migrator, postgres::PgPoolOptions, types::Uuid, Acquire, Executor, Postgres, Row,
|
||||
};
|
||||
|
@ -1288,6 +1289,43 @@ RETURNING *;
|
|||
|
||||
Ok(partition)
|
||||
}
|
||||
|
||||
async fn record_skipped_compaction(
|
||||
&mut self,
|
||||
partition_id: PartitionId,
|
||||
reason: &str,
|
||||
) -> Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO skipped_compactions
|
||||
( partition_id, reason, skipped_at )
|
||||
VALUES
|
||||
( $1, $2, extract(epoch from NOW()) )
|
||||
ON CONFLICT ( partition_id )
|
||||
DO UPDATE
|
||||
SET
|
||||
reason = EXCLUDED.reason,
|
||||
skipped_at = EXCLUDED.skipped_at;
|
||||
"#,
|
||||
)
|
||||
.bind(partition_id)
|
||||
.bind(reason)
|
||||
.execute(&mut self.inner)
|
||||
.await
|
||||
.context(interface::CouldNotRecordSkippedCompactionSnafu { partition_id })?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn list_skipped_compactions(&mut self) -> Result<Vec<SkippedCompaction>> {
|
||||
sqlx::query_as::<_, SkippedCompaction>(
|
||||
r#"
|
||||
SELECT * FROM skipped_compactions
|
||||
"#,
|
||||
)
|
||||
.fetch_all(&mut self.inner)
|
||||
.await
|
||||
.context(interface::CouldNotListSkippedCompactionsSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
@ -1718,7 +1756,7 @@ SELECT partition_id, table_id, shard_id, namespace_id, count(id)
|
|||
FROM parquet_file
|
||||
WHERE compaction_level = 0 and to_delete is null
|
||||
and shard_id = $1
|
||||
and created_at > $2
|
||||
and created_at > $2
|
||||
group by 1, 2, 3, 4
|
||||
having count(id) >= $3
|
||||
order by 5 DESC
|
||||
|
|
Loading…
Reference in New Issue