feat(processing_engine): Add every mode for scheduled plugins. (#25891)
parent
63bd5096f5
commit
f1ea2d8747
|
@ -3002,6 +3002,7 @@ dependencies = [
|
|||
"data_types",
|
||||
"datafusion_util",
|
||||
"hashbrown 0.15.2",
|
||||
"humantime",
|
||||
"hyper 0.14.32",
|
||||
"influxdb3_cache",
|
||||
"influxdb3_catalog",
|
||||
|
@ -3197,6 +3198,7 @@ dependencies = [
|
|||
"data_types",
|
||||
"futures-util",
|
||||
"hashbrown 0.15.2",
|
||||
"humantime",
|
||||
"indexmap 2.7.0",
|
||||
"influxdb-line-protocol",
|
||||
"influxdb3_id",
|
||||
|
|
|
@ -15,6 +15,7 @@ bytes.workspace = true
|
|||
chrono.workspace = true
|
||||
cron.workspace = true
|
||||
data_types.workspace = true
|
||||
humantime.workspace = true
|
||||
hashbrown.workspace = true
|
||||
hyper.workspace = true
|
||||
iox_time.workspace = true
|
||||
|
|
|
@ -112,6 +112,12 @@ impl PluginChannels {
|
|||
.or_default()
|
||||
.insert(trigger, tx);
|
||||
}
|
||||
TriggerSpecificationDefinition::Every { .. } => {
|
||||
self.active_triggers
|
||||
.entry(db)
|
||||
.or_default()
|
||||
.insert(trigger, tx);
|
||||
}
|
||||
TriggerSpecificationDefinition::RequestPath { path } => {
|
||||
self.request_triggers.insert(path.to_string(), tx);
|
||||
}
|
||||
|
@ -479,7 +485,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
|
|||
trigger,
|
||||
Arc::clone(&self.time_provider),
|
||||
plugin_context,
|
||||
),
|
||||
)?,
|
||||
PluginType::Request => plugins::run_request_plugin(
|
||||
db_name.to_string(),
|
||||
plugin_code,
|
||||
|
|
|
@ -96,12 +96,7 @@ pub(crate) fn run_schedule_plugin(
|
|||
trigger_definition: TriggerDefinition,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
context: PluginContext,
|
||||
) {
|
||||
let TriggerSpecificationDefinition::Schedule { schedule } = &trigger_definition.trigger else {
|
||||
// TODO: these linkages should be guaranteed by code.
|
||||
unreachable!("this should've been checked");
|
||||
};
|
||||
let schedule = schedule.to_string();
|
||||
) -> Result<(), Error> {
|
||||
let trigger_plugin = TriggerPlugin {
|
||||
trigger_definition,
|
||||
db_name,
|
||||
|
@ -109,12 +104,17 @@ pub(crate) fn run_schedule_plugin(
|
|||
write_buffer: context.write_buffer,
|
||||
query_executor: context.query_executor,
|
||||
};
|
||||
let runner = python_plugin::ScheduleTriggerRunner::try_new(
|
||||
&trigger_plugin.trigger_definition.trigger,
|
||||
Arc::clone(&time_provider),
|
||||
)?;
|
||||
tokio::task::spawn(async move {
|
||||
trigger_plugin
|
||||
.run_schedule_plugin(context.trigger_rx, schedule, time_provider)
|
||||
.run_schedule_plugin(context.trigger_rx, runner, time_provider)
|
||||
.await
|
||||
.expect("cron trigger plugin failed");
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "system-py")]
|
||||
|
@ -163,9 +163,10 @@ struct TriggerPlugin {
|
|||
mod python_plugin {
|
||||
use super::*;
|
||||
use anyhow::{anyhow, Context};
|
||||
use chrono::{DateTime, Utc};
|
||||
use cron::{OwnedScheduleIterator, Schedule};
|
||||
use chrono::{DateTime, Duration, Utc};
|
||||
use cron::{OwnedScheduleIterator, Schedule as CronSchedule};
|
||||
use data_types::NamespaceName;
|
||||
use humantime::{format_duration, parse_duration};
|
||||
use hyper::http::HeaderValue;
|
||||
use hyper::{Body, Response, StatusCode};
|
||||
use influxdb3_catalog::catalog::DatabaseSchema;
|
||||
|
@ -219,12 +220,9 @@ mod python_plugin {
|
|||
pub(crate) async fn run_schedule_plugin(
|
||||
&self,
|
||||
mut receiver: Receiver<PluginEvent>,
|
||||
schedule: String,
|
||||
mut runner: ScheduleTriggerRunner,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
) -> Result<(), Error> {
|
||||
let schedule = Schedule::from_str(schedule.as_str())?;
|
||||
let mut runner = ScheduleTriggerRunner::new(schedule, Arc::clone(&time_provider));
|
||||
|
||||
loop {
|
||||
let Some(next_run_instant) = runner.next_run_time() else {
|
||||
break;
|
||||
|
@ -379,7 +377,12 @@ mod python_plugin {
|
|||
TriggerSpecificationDefinition::Schedule {
|
||||
schedule
|
||||
} => {
|
||||
return Err(anyhow!("unexpectedly found scheduled trigger specification {} for WAL plugin {}", schedule, self.trigger_definition.trigger_name).into())
|
||||
return Err(anyhow!("unexpectedly found scheduled trigger specification cron:{} for WAL plugin {}", schedule, self.trigger_definition.trigger_name).into())
|
||||
}
|
||||
TriggerSpecificationDefinition::Every {
|
||||
duration,
|
||||
} => {
|
||||
return Err(anyhow!("unexpectedly found every trigger specification every:{} WAL plugin {}", format_duration(*duration), self.trigger_definition.trigger_name).into())
|
||||
}
|
||||
TriggerSpecificationDefinition::RequestPath { path } => {
|
||||
return Err(anyhow!("unexpectedly found request path trigger specification {} for WAL plugin {}", path, self.trigger_definition.trigger_name).into())
|
||||
|
@ -459,17 +462,69 @@ mod python_plugin {
|
|||
}
|
||||
}
|
||||
|
||||
struct ScheduleTriggerRunner {
|
||||
schedule: OwnedScheduleIterator<Utc>,
|
||||
enum Schedule {
|
||||
Cron(OwnedScheduleIterator<Utc>),
|
||||
Every(Duration),
|
||||
}
|
||||
|
||||
pub(crate) struct ScheduleTriggerRunner {
|
||||
schedule: Schedule,
|
||||
next_trigger_time: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
impl ScheduleTriggerRunner {
|
||||
fn new(cron_schedule: Schedule, time_provider: Arc<dyn TimeProvider>) -> Self {
|
||||
pub(crate) fn try_new(
|
||||
trigger_spec: &TriggerSpecificationDefinition,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
) -> Result<Self, Error> {
|
||||
match trigger_spec {
|
||||
TriggerSpecificationDefinition::AllTablesWalWrite
|
||||
| TriggerSpecificationDefinition::SingleTableWalWrite { .. } => {
|
||||
Err(anyhow!("shouldn't have table trigger for scheduled plugin").into())
|
||||
}
|
||||
TriggerSpecificationDefinition::RequestPath { .. } => {
|
||||
Err(anyhow!("shouldn't have request path trigger for scheduled plugin").into())
|
||||
}
|
||||
TriggerSpecificationDefinition::Schedule { schedule } => {
|
||||
let schedule = CronSchedule::from_str(schedule.as_str())
|
||||
.context("cron schedule should be parsable")?;
|
||||
Ok(Self::new_cron(schedule, time_provider))
|
||||
}
|
||||
TriggerSpecificationDefinition::Every { duration } => {
|
||||
// check that duration isn't longer than a year, so we avoid overflows.
|
||||
if *duration > parse_duration("1 year").unwrap() {
|
||||
return Err(
|
||||
anyhow!("schedule duration cannot be greater than 1 year").into()
|
||||
);
|
||||
}
|
||||
Ok(Self::new_every(
|
||||
Duration::from_std(*duration)
|
||||
.context("should be able to convert durations. ")?,
|
||||
time_provider,
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
fn new_cron(cron_schedule: CronSchedule, time_provider: Arc<dyn TimeProvider>) -> Self {
|
||||
let mut schedule = cron_schedule.after_owned(time_provider.now().date_time());
|
||||
let next_trigger_time = schedule.next();
|
||||
Self {
|
||||
schedule,
|
||||
schedule: Schedule::Cron(schedule),
|
||||
next_trigger_time,
|
||||
}
|
||||
}
|
||||
|
||||
fn new_every(duration: Duration, time_provider: Arc<dyn TimeProvider>) -> Self {
|
||||
let now = time_provider.now().date_time();
|
||||
let duration_millis = duration.num_milliseconds();
|
||||
let now_millis = now.timestamp_millis();
|
||||
let next_trigger_millis = ((now_millis / duration_millis) + 1) * duration_millis;
|
||||
let next_trigger_time = Some(
|
||||
DateTime::from_timestamp_millis(next_trigger_millis)
|
||||
.expect("can't be out of range"),
|
||||
);
|
||||
Self {
|
||||
schedule: Schedule::Every(duration),
|
||||
next_trigger_time,
|
||||
}
|
||||
}
|
||||
|
@ -503,7 +558,10 @@ mod python_plugin {
|
|||
}
|
||||
|
||||
fn advance_time(&mut self) {
|
||||
self.next_trigger_time = self.schedule.next();
|
||||
self.next_trigger_time = match &mut self.schedule {
|
||||
Schedule::Cron(schedule) => schedule.next(),
|
||||
Schedule::Every(duration) => self.next_trigger_time.map(|time| time + *duration),
|
||||
};
|
||||
}
|
||||
|
||||
/// A funky little method to get a tokio Instant that we can call `tokio::time::sleep_until()` on.
|
||||
|
|
|
@ -25,6 +25,7 @@ cron = "0.14"
|
|||
crc32fast.workspace = true
|
||||
futures-util.workspace = true
|
||||
hashbrown.workspace = true
|
||||
humantime.workspace = true
|
||||
indexmap.workspace = true
|
||||
object_store.workspace = true
|
||||
parking_lot.workspace = true
|
||||
|
|
|
@ -12,6 +12,7 @@ use async_trait::async_trait;
|
|||
use cron::Schedule;
|
||||
use data_types::Timestamp;
|
||||
use hashbrown::HashMap;
|
||||
use humantime::{format_duration, parse_duration};
|
||||
use indexmap::IndexMap;
|
||||
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
|
||||
use influxdb_line_protocol::v3::SeriesValue;
|
||||
|
@ -58,8 +59,11 @@ pub enum Error {
|
|||
#[error("invalid WAL file path")]
|
||||
InvalidWalFilePath,
|
||||
|
||||
#[error("failed to parse trigger from {}", trigger_spec)]
|
||||
TriggerSpecificationParseError { trigger_spec: String },
|
||||
#[error("failed to parse trigger from {trigger_spec}{}", .context.as_ref().map(|context| format!(": {context}")).unwrap_or_default())]
|
||||
TriggerSpecificationParseError {
|
||||
trigger_spec: String,
|
||||
context: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -671,6 +675,7 @@ pub enum TriggerSpecificationDefinition {
|
|||
AllTablesWalWrite,
|
||||
Schedule { schedule: String },
|
||||
RequestPath { path: String },
|
||||
Every { duration: Duration },
|
||||
}
|
||||
|
||||
impl TriggerSpecificationDefinition {
|
||||
|
@ -682,6 +687,7 @@ impl TriggerSpecificationDefinition {
|
|||
if table_name.is_empty() {
|
||||
return Err(Error::TriggerSpecificationParseError {
|
||||
trigger_spec: spec_str.to_string(),
|
||||
context: Some("table name is empty".to_string()),
|
||||
});
|
||||
}
|
||||
Ok(TriggerSpecificationDefinition::SingleTableWalWrite {
|
||||
|
@ -689,22 +695,40 @@ impl TriggerSpecificationDefinition {
|
|||
})
|
||||
}
|
||||
"all_tables" => Ok(TriggerSpecificationDefinition::AllTablesWalWrite),
|
||||
s if s.starts_with("schedule:") => {
|
||||
let cron_schedule = s.trim_start_matches("schedule:").trim();
|
||||
s if s.starts_with("cron:") => {
|
||||
let cron_schedule = s.trim_start_matches("cron:").trim();
|
||||
if cron_schedule.is_empty() || Schedule::from_str(cron_schedule).is_err() {
|
||||
return Err(Error::TriggerSpecificationParseError {
|
||||
trigger_spec: spec_str.to_string(),
|
||||
context: None,
|
||||
});
|
||||
}
|
||||
Ok(TriggerSpecificationDefinition::Schedule {
|
||||
schedule: cron_schedule.to_string(),
|
||||
})
|
||||
}
|
||||
s if s.starts_with("every:") => {
|
||||
let duration_str = s.trim_start_matches("every:").trim();
|
||||
let Ok(duration) = parse_duration(duration_str) else {
|
||||
return Err(Error::TriggerSpecificationParseError {
|
||||
trigger_spec: spec_str.to_string(),
|
||||
context: Some("couldn't parse to duration".to_string()),
|
||||
});
|
||||
};
|
||||
if duration > parse_duration("1 year").unwrap() {
|
||||
return Err(Error::TriggerSpecificationParseError {
|
||||
trigger_spec: spec_str.to_string(),
|
||||
context: Some("don't support every schedules of over 1 year".to_string()),
|
||||
});
|
||||
}
|
||||
Ok(TriggerSpecificationDefinition::Every { duration })
|
||||
}
|
||||
s if s.starts_with("request:") => {
|
||||
let path = s.trim_start_matches("request:").trim();
|
||||
if path.is_empty() {
|
||||
return Err(Error::TriggerSpecificationParseError {
|
||||
trigger_spec: spec_str.to_string(),
|
||||
context: None,
|
||||
});
|
||||
}
|
||||
Ok(TriggerSpecificationDefinition::RequestPath {
|
||||
|
@ -713,6 +737,7 @@ impl TriggerSpecificationDefinition {
|
|||
}
|
||||
_ => Err(Error::TriggerSpecificationParseError {
|
||||
trigger_spec: spec_str.to_string(),
|
||||
context: Some("expect one of the following prefixes: 'table:', 'all_tables:', 'cron:', or 'every:'".to_string()),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
@ -724,7 +749,10 @@ impl TriggerSpecificationDefinition {
|
|||
}
|
||||
TriggerSpecificationDefinition::AllTablesWalWrite => "all_tables".to_string(),
|
||||
TriggerSpecificationDefinition::Schedule { schedule } => {
|
||||
format!("schedule:{}", schedule)
|
||||
format!("cron:{}", schedule)
|
||||
}
|
||||
TriggerSpecificationDefinition::Every { duration } => {
|
||||
format!("every:{}", format_duration(*duration))
|
||||
}
|
||||
TriggerSpecificationDefinition::RequestPath { path } => {
|
||||
format!("request:{}", path)
|
||||
|
|
Loading…
Reference in New Issue