feat: Create compactor service to list skipped compactions

pull/24376/head
Carol (Nichols || Goulding) 2022-10-19 11:03:07 -04:00
parent 5f04acc9d5
commit ba25300b01
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
14 changed files with 348 additions and 21 deletions

2
Cargo.lock generated
View File

@ -769,6 +769,7 @@ dependencies = [
"data_types",
"datafusion 0.1.0",
"futures",
"generated_types",
"iox_catalog",
"iox_query",
"iox_tests",
@ -784,6 +785,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-util",
"tonic",
"uuid",
"workspace-hack",
]

View File

@ -12,6 +12,7 @@ bytes = "1.2"
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
futures = "0.3"
generated_types = { path = "../generated_types" }
iox_catalog = { path = "../iox_catalog" }
metric = { path = "../metric" }
object_store = "0.5.1"
@ -25,6 +26,7 @@ thiserror = "1.0"
iox_time = { path = "../iox_time" }
tokio = { version = "1.21", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
tokio-util = { version = "0.7.4" }
tonic = { version = "0.8" }
uuid = { version = "1", features = ["v4"] }
workspace-hack = { path = "../workspace-hack"}

View File

@ -2,6 +2,7 @@
use crate::{cold, compact::Compactor, hot};
use async_trait::async_trait;
use data_types::SkippedCompaction;
use futures::{
future::{BoxFuture, Shared},
FutureExt, TryFutureExt,
@ -20,9 +21,14 @@ use tokio_util::sync::CancellationToken;
#[allow(missing_copy_implementations, missing_docs)]
pub enum Error {}
/// The [`CompactorHandler`] does nothing at this point
/// The [`CompactorHandler`] runs the compactor as a service and handles skipped compactions.
#[async_trait]
pub trait CompactorHandler: Send + Sync {
/// Return skipped compactions from the catalog
async fn skipped_compactions(
&self,
) -> Result<Vec<SkippedCompaction>, ListSkippedCompactionsError>;
/// Wait until the handler finished to shutdown.
///
/// Use [`shutdown`](Self::shutdown) to trigger a shutdown.
@ -40,12 +46,11 @@ fn shared_handle(handle: JoinHandle<()>) -> SharedJoinHandle {
handle.map_err(Arc::new).boxed().shared()
}
/// Implementation of the `CompactorHandler` trait (that currently does nothing)
/// Implementation of the `CompactorHandler` trait
#[derive(Debug)]
pub struct CompactorHandlerImpl {
/// Data to compact
#[allow(dead_code)]
compactor_data: Arc<Compactor>,
/// Management of all data relevant to compaction
compactor: Arc<Compactor>,
/// A token that is used to trigger shutdown of the background worker
shutdown: CancellationToken,
@ -59,21 +64,19 @@ pub struct CompactorHandlerImpl {
impl CompactorHandlerImpl {
/// Initialize the Compactor
pub fn new(compactor: Compactor) -> Self {
let compactor_data = Arc::new(compactor);
pub fn new(compactor: Arc<Compactor>) -> Self {
let shutdown = CancellationToken::new();
let runner_handle = tokio::task::spawn(run_compactor(
Arc::clone(&compactor_data),
Arc::clone(&compactor),
shutdown.child_token(),
));
let runner_handle = shared_handle(runner_handle);
info!("compactor started with config {:?}", compactor_data.config);
info!("compactor started with config {:?}", compactor.config);
let exec = Arc::clone(&compactor_data.exec);
let exec = Arc::clone(&compactor.exec);
Self {
compactor_data,
compactor,
shutdown,
runner_handle,
exec,
@ -130,13 +133,22 @@ pub struct CompactorConfig {
/// Minimum number of rows allocated for each record batch fed into DataFusion plan
///
/// We will use max(parquet_file's row_count, min_num_rows_allocated_per_record_batch_to_datafusion_plan)
/// We will use:
///
/// ```text
/// max(
/// parquet_file's row_count,
/// min_num_rows_allocated_per_record_batch_to_datafusion_plan
/// )
/// ```
///
/// to estimate number of rows allocated for each record batch fed into DataFusion plan.
pub min_num_rows_allocated_per_record_batch_to_datafusion_plan: u64,
/// Max number of files to compact per partition
///
/// Due to limit in fan-in of datafusion plan, we need to limit the number of files to compact per partition.
/// Due to limit in fan-in of datafusion plan, we need to limit the number of files to compact
/// per partition.
pub max_num_compacting_files: usize,
}
@ -188,8 +200,28 @@ pub async fn run_compactor_once(compactor: Arc<Compactor>) {
);
}
#[derive(Debug, Error)]
#[allow(missing_copy_implementations, missing_docs)]
pub enum ListSkippedCompactionsError {
#[error(transparent)]
SkippedCompactionLookup(iox_catalog::interface::Error),
}
#[async_trait]
impl CompactorHandler for CompactorHandlerImpl {
async fn skipped_compactions(
&self,
) -> Result<Vec<SkippedCompaction>, ListSkippedCompactionsError> {
self.compactor
.catalog
.repositories()
.await
.partitions()
.list_skipped_compactions()
.await
.map_err(ListSkippedCompactionsError::SkippedCompactionLookup)
}
async fn join(&self) {
self.runner_handle
.clone()
@ -212,3 +244,43 @@ impl Drop for CompactorHandlerImpl {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::{test_setup, TestSetup};
#[tokio::test]
async fn list_skipped_compactions() {
let TestSetup {
compactor,
table,
shard,
..
} = test_setup().await;
let compactor_handler = CompactorHandlerImpl::new(Arc::clone(&compactor));
// no skipped compactions
let skipped_compactions = compactor_handler.skipped_compactions().await.unwrap();
assert!(
skipped_compactions.is_empty(),
"Expected no compactions, got {skipped_compactions:?}"
);
// insert a partition and a skipped compaction
let partition = table.with_shard(&shard).create_partition("one").await;
{
let mut repos = compactor.catalog.repositories().await;
repos
.partitions()
.record_skipped_compaction(partition.partition.id, "Not today", 3, 2, 100_000, 100)
.await
.unwrap()
}
let skipped_compactions = compactor_handler.skipped_compactions().await.unwrap();
assert_eq!(skipped_compactions.len(), 1);
assert_eq!(skipped_compactions[0].partition_id, partition.partition.id);
}
}

View File

@ -1,9 +1,11 @@
//! Compactor server entrypoint.
use std::sync::Arc;
use self::grpc::GrpcDelegate;
use crate::handler::CompactorHandler;
use std::fmt::Debug;
use std::sync::Arc;
pub mod grpc;
/// The [`CompactorServer`] manages the lifecycle and contains all state for a
/// `compactor` server instance.
@ -11,14 +13,20 @@ use std::fmt::Debug;
pub struct CompactorServer<C: CompactorHandler> {
metrics: Arc<metric::Registry>,
grpc: GrpcDelegate<C>,
handler: Arc<C>,
}
impl<C: CompactorHandler> CompactorServer<C> {
/// Initialise a new [`CompactorServer`] using the provided HTTP and gRPC
/// handlers.
pub fn new(metrics: Arc<metric::Registry>, handler: Arc<C>) -> Self {
Self { metrics, handler }
pub fn new(metrics: Arc<metric::Registry>, grpc: GrpcDelegate<C>, handler: Arc<C>) -> Self {
Self {
metrics,
grpc,
handler,
}
}
/// Return the [`metric::Registry`] used by the router.
@ -36,3 +44,10 @@ impl<C: CompactorHandler> CompactorServer<C> {
self.handler.shutdown();
}
}
impl<I: CompactorHandler + Debug> CompactorServer<I> {
/// Get a reference to the grpc delegate.
pub fn grpc(&self) -> &GrpcDelegate<I> {
&self.grpc
}
}

View File

@ -0,0 +1,72 @@
//! gRPC service implementations for `compactor`.
use crate::handler::{CompactorHandler, ListSkippedCompactionsError};
use generated_types::influxdata::iox::compactor::v1::{
self as proto,
skipped_compaction_service_server::{SkippedCompactionService, SkippedCompactionServiceServer},
};
use std::sync::Arc;
use tonic::{Request, Response};
/// This type is responsible for managing all gRPC services exposed by `compactor`.
#[derive(Debug, Default)]
pub struct GrpcDelegate<I: CompactorHandler> {
compactor_handler: Arc<I>,
}
impl<I: CompactorHandler + Send + Sync + 'static> GrpcDelegate<I> {
/// Initialise a new [`GrpcDelegate`] passing valid requests to the specified
/// `compactor_handler`.
pub fn new(compactor_handler: Arc<I>) -> Self {
Self { compactor_handler }
}
/// Acquire a SkippedCompaction gRPC service implementation.
pub fn skipped_compaction_service(
&self,
) -> SkippedCompactionServiceServer<impl SkippedCompactionService> {
SkippedCompactionServiceServer::new(SkippedCompactionServiceImpl::new(Arc::clone(
&self.compactor_handler,
) as _))
}
}
/// Implementation of skipped compaction
struct SkippedCompactionServiceImpl {
handler: Arc<dyn CompactorHandler + Send + Sync + 'static>,
}
impl SkippedCompactionServiceImpl {
pub fn new(handler: Arc<dyn CompactorHandler + Send + Sync + 'static>) -> Self {
Self { handler }
}
}
impl From<ListSkippedCompactionsError> for tonic::Status {
/// Logs and converts a result from the business logic into the appropriate tonic status
fn from(_err: ListSkippedCompactionsError) -> Self {
Self::unimplemented("Not yet implemented")
}
}
#[tonic::async_trait]
impl SkippedCompactionService for SkippedCompactionServiceImpl {
async fn list_skipped_compactions(
&self,
_request: Request<proto::ListSkippedCompactionsRequest>,
) -> Result<Response<proto::ListSkippedCompactionsResponse>, tonic::Status> {
let skipped_compactions = self
.handler
.skipped_compactions()
.await?
.into_iter()
.map(From::from)
.collect();
Ok(tonic::Response::new(
proto::ListSkippedCompactionsResponse {
skipped_compactions,
},
))
}
}

View File

@ -19,6 +19,7 @@ fn main() -> Result<()> {
/// Creates:
///
/// - `influxdata.iox.catalog.v1.rs`
/// - `influxdata.iox.compactor.v1.rs`
/// - `influxdata.iox.delete.v1.rs`
/// - `influxdata.iox.ingester.v1.rs`
/// - `influxdata.iox.namespace.v1.rs`
@ -32,6 +33,7 @@ fn main() -> Result<()> {
/// - `influxdata.platform.storage.rs`
fn generate_grpc_types(root: &Path) -> Result<()> {
let catalog_path = root.join("influxdata/iox/catalog/v1");
let compactor_path = root.join("influxdata/iox/compactor/v1");
let delete_path = root.join("influxdata/iox/delete/v1");
let ingester_path = root.join("influxdata/iox/ingester/v1");
let namespace_path = root.join("influxdata/iox/namespace/v1");
@ -47,6 +49,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
let proto_files = vec![
catalog_path.join("parquet_file.proto"),
catalog_path.join("service.proto"),
compactor_path.join("skipped_compaction.proto"),
delete_path.join("service.proto"),
ingester_path.join("parquet_metadata.proto"),
ingester_path.join("query.proto"),

View File

@ -0,0 +1,31 @@
syntax = "proto3";
package influxdata.iox.compactor.v1;
option go_package = "github.com/influxdata/iox/compactor/v1";
service SkippedCompactionService {
// List all skipped compactions in the catalog
rpc ListSkippedCompactions(ListSkippedCompactionsRequest) returns (ListSkippedCompactionsResponse);
}
message ListSkippedCompactionsRequest {}
message ListSkippedCompactionsResponse {
// A list of skipped compactions
repeated SkippedCompaction skipped_compactions = 1;
}
message SkippedCompaction {
int64 partition_id = 1;
string reason = 2;
int64 skipped_at = 3;
int64 num_files = 4;
int64 limit_num_files = 5;
int64 estimated_bytes = 6;
int64 limit_bytes = 7;
}

View File

@ -0,0 +1,26 @@
use crate::influxdata::iox::compactor::v1 as proto;
use data_types::SkippedCompaction;
impl From<SkippedCompaction> for proto::SkippedCompaction {
fn from(skipped_compaction: SkippedCompaction) -> Self {
let SkippedCompaction {
partition_id,
reason,
skipped_at,
estimated_bytes,
limit_bytes,
num_files,
limit_num_files,
} = skipped_compaction;
Self {
partition_id: partition_id.get(),
reason,
skipped_at: skipped_at.get(),
estimated_bytes,
limit_bytes,
num_files,
limit_num_files,
}
}
}

View File

@ -48,6 +48,16 @@ pub mod influxdata {
}
}
pub mod compactor {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/influxdata.iox.compactor.v1.rs"));
include!(concat!(
env!("OUT_DIR"),
"/influxdata.iox.compactor.v1.serde.rs"
));
}
}
pub mod delete {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/influxdata.iox.delete.v1.rs"));
@ -241,6 +251,8 @@ pub use influxdata::platform::storage::*;
pub mod google;
#[cfg(any(feature = "data_types_conversions", test))]
pub mod compactor;
#[cfg(any(feature = "data_types_conversions", test))]
pub mod delete_predicate;
#[cfg(any(feature = "data_types_conversions", test))]

View File

@ -6,6 +6,7 @@ mod namespace;
mod parquet_to_lp;
mod print_cpu;
mod schema;
mod skipped_compactions;
#[derive(Debug, Snafu)]
pub enum Error {
@ -20,6 +21,10 @@ pub enum Error {
#[snafu(context(false))]
#[snafu(display("Error in parquet_to_lp subcommand: {}", source))]
ParquetToLp { source: parquet_to_lp::Error },
#[snafu(context(false))]
#[snafu(display("Error in skipped-compactions subcommand: {}", source))]
SkippedCompactions { source: skipped_compactions::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -44,6 +49,9 @@ enum Command {
/// Convert IOx Parquet files back into line protocol format
ParquetToLp(parquet_to_lp::Config),
/// Interrogate skipped compactions
SkippedCompactions(skipped_compactions::Config),
}
pub async fn command<C, CFut>(connection: C, config: Config) -> Result<()>
@ -62,6 +70,10 @@ where
schema::command(connection, config).await?
}
Command::ParquetToLp(config) => parquet_to_lp::command(config).await?,
Command::SkippedCompactions(config) => {
let connection = connection().await;
skipped_compactions::command(connection, config).await?
}
}
Ok(())

View File

@ -0,0 +1,40 @@
//! This module implements the `skipped-compactions` CLI command
use influxdb_iox_client::{connection::Connection, skipped_compactions};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("JSON Serialization error: {0}")]
Serde(#[from] serde_json::Error),
#[error("Client error: {0}")]
Client(#[from] influxdb_iox_client::error::Error),
}
/// Various commands for skipped compaction inspection
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(subcommand)]
command: Command,
}
/// All possible subcommands for skipped compaction
#[derive(Debug, clap::Parser)]
enum Command {
/// List all skipped compactions
List,
}
pub async fn command(connection: Connection, config: Config) -> Result<(), Error> {
let mut client = skipped_compactions::Client::new(connection);
match config.command {
Command::List => {
let skipped_compactions = client.skipped_compactions().await?;
println!("{}", serde_json::to_string_pretty(&skipped_compactions)?);
} // Deliberately not adding _ => so the compiler will direct people here to impl new
// commands
}
Ok(())
}

View File

@ -20,6 +20,9 @@ pub mod namespace;
/// Client for schema API
pub mod schema;
/// Client for the skipped compactions API
pub mod skipped_compactions;
/// Client for interacting with a remote object store
pub mod store;

View File

@ -0,0 +1,33 @@
use self::generated_types::{skipped_compaction_service_client::SkippedCompactionServiceClient, *};
use crate::{connection::Connection, error::Error};
use client_util::connection::GrpcConnection;
/// Re-export generated_types
pub mod generated_types {
pub use generated_types::influxdata::iox::compactor::v1::*;
}
/// A basic client for fetching the Schema for a Namespace.
#[derive(Debug, Clone)]
pub struct Client {
inner: SkippedCompactionServiceClient<GrpcConnection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(connection: Connection) -> Self {
Self {
inner: SkippedCompactionServiceClient::new(connection.into_grpc_connection()),
}
}
/// List all skipped compactions
pub async fn skipped_compactions(&mut self) -> Result<Vec<SkippedCompaction>, Error> {
let response = self
.inner
.list_skipped_compactions(ListSkippedCompactionsRequest {})
.await?;
Ok(response.into_inner().skipped_compactions)
}
}

View File

@ -2,7 +2,7 @@ use async_trait::async_trait;
use clap_blocks::compactor::CompactorConfig;
use compactor::{
handler::{CompactorHandler, CompactorHandlerImpl},
server::CompactorServer,
server::{grpc::GrpcDelegate, CompactorServer},
};
use data_types::ShardIndex;
use hyper::{Body, Request, Response};
@ -87,6 +87,7 @@ impl<C: CompactorHandler + std::fmt::Debug + 'static> ServerType for CompactorSe
/// Provide a placeholder gRPC service.
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> {
let builder = setup_builder!(builder_input, self);
add_service!(builder, self.server.grpc().skipped_compaction_service());
serve_builder!(builder);
Ok(())
@ -149,8 +150,11 @@ pub async fn create_compactor_server_type(
)
.await?;
let compactor_handler = Arc::new(CompactorHandlerImpl::new(compactor));
let compactor = CompactorServer::new(metric_registry, compactor_handler);
let compactor_handler = Arc::new(CompactorHandlerImpl::new(Arc::new(compactor)));
let grpc = GrpcDelegate::new(Arc::clone(&compactor_handler));
let compactor = CompactorServer::new(metric_registry, grpc, compactor_handler);
Ok(Arc::new(CompactorServerType::new(compactor, common_state)))
}