feat: reloads plugins if modified
This updates plugins so that they will reload the code if the local file is modified. Github pugins continue to be loaded only once when they are initially created or loaded on startup. This will make iterating on plugin development locally much easier. Closes #25863pd/reload-plugin-on-modify
parent
e642efda1f
commit
96c56145d0
|
@ -1660,4 +1660,33 @@ def process_request(influxdb3_local, query_parameters, request_headers, request_
|
|||
.unwrap();
|
||||
|
||||
assert_eq!(val, json!([{"tag1": "tag1_value", "field1": 1}]));
|
||||
|
||||
// now update it to make sure that it reloads
|
||||
let plugin_code = r#"
|
||||
import json
|
||||
|
||||
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
|
||||
return 200, {"Content-Type": "application/json"}, json.dumps({"status": "updated"})
|
||||
"#;
|
||||
// clear all bytes from the plugin file
|
||||
plugin_file.reopen().unwrap().set_len(0).unwrap();
|
||||
plugin_file
|
||||
.reopen()
|
||||
.unwrap()
|
||||
.write_all(plugin_code.as_bytes())
|
||||
.unwrap();
|
||||
|
||||
// send an HTTP request to the server
|
||||
let response = client
|
||||
.post(format!("{}/api/v3/engine/foo", server_addr))
|
||||
.header("Content-Type", "application/json")
|
||||
.query(&[("q1", "whatevs")])
|
||||
.body(r#"{"hello": "world"}"#)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let body = response.text().await.unwrap();
|
||||
let body = serde_json::from_str::<serde_json::Value>(&body).unwrap();
|
||||
assert_eq!(body, json!({"status": "updated"}));
|
||||
}
|
||||
|
|
|
@ -23,7 +23,9 @@ use influxdb3_write::WriteBuffer;
|
|||
use iox_time::TimeProvider;
|
||||
use observability_deps::tracing::{debug, error, warn};
|
||||
use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::SystemTime;
|
||||
use tokio::sync::oneshot::Receiver;
|
||||
use tokio::sync::{mpsc, oneshot, RwLock};
|
||||
|
||||
|
@ -169,7 +171,7 @@ impl ProcessingEngineManagerImpl {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn read_plugin_code(&self, name: &str) -> Result<String, plugins::Error> {
|
||||
pub async fn read_plugin_code(&self, name: &str) -> Result<PluginCode, plugins::Error> {
|
||||
// if the name starts with gh: then we need to get it from the public github repo at https://github.com/influxdata/influxdb3_plugins/tree/main
|
||||
if name.starts_with("gh:") {
|
||||
let plugin_path = name.strip_prefix("gh:").unwrap();
|
||||
|
@ -189,13 +191,76 @@ impl ProcessingEngineManagerImpl {
|
|||
.text()
|
||||
.await
|
||||
.context("error reading plugin from github repo")?;
|
||||
return Ok(resp_body);
|
||||
return Ok(PluginCode::Github(Arc::from(resp_body)));
|
||||
}
|
||||
|
||||
// otherwise we assume it is a local file
|
||||
let plugin_dir = self.plugin_dir.clone().context("plugin dir not set")?;
|
||||
let path = plugin_dir.join(name);
|
||||
Ok(std::fs::read_to_string(path)?)
|
||||
let plugin_path = plugin_dir.join(name);
|
||||
|
||||
// read it at least once to make sure it's there
|
||||
let code = std::fs::read_to_string(plugin_path.clone())?;
|
||||
|
||||
// now we can return it
|
||||
Ok(PluginCode::Local(LocalPlugin {
|
||||
plugin_path,
|
||||
last_read_and_code: Mutex::new((SystemTime::now(), Arc::from(code))),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum PluginCode {
|
||||
Github(Arc<str>),
|
||||
Local(LocalPlugin),
|
||||
}
|
||||
|
||||
impl PluginCode {
|
||||
#[cfg(feature = "system-py")]
|
||||
pub(crate) fn code(&self) -> Arc<str> {
|
||||
match self {
|
||||
PluginCode::Github(code) => Arc::clone(code),
|
||||
PluginCode::Local(plugin) => plugin.read_if_modified(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug)]
|
||||
pub struct LocalPlugin {
|
||||
plugin_path: PathBuf,
|
||||
last_read_and_code: Mutex<(SystemTime, Arc<str>)>,
|
||||
}
|
||||
|
||||
impl LocalPlugin {
|
||||
#[cfg(feature = "system-py")]
|
||||
fn read_if_modified(&self) -> Arc<str> {
|
||||
let metadata = std::fs::metadata(&self.plugin_path);
|
||||
|
||||
let mut last_read_and_code = self.last_read_and_code.lock().unwrap();
|
||||
let (last_read, code) = &mut *last_read_and_code;
|
||||
|
||||
match metadata {
|
||||
Ok(metadata) => {
|
||||
let is_modified = match metadata.modified() {
|
||||
Ok(modified) => modified > *last_read,
|
||||
Err(_) => true, // if we can't get the modified time, assume it is modified
|
||||
};
|
||||
|
||||
if is_modified {
|
||||
// attempt to read the code, if it fails we will return the last known code
|
||||
if let Ok(new_code) = std::fs::read_to_string(&self.plugin_path) {
|
||||
*last_read = SystemTime::now();
|
||||
*code = Arc::from(new_code);
|
||||
} else {
|
||||
error!("error reading plugin file {:?}", self.plugin_path);
|
||||
}
|
||||
}
|
||||
|
||||
Arc::clone(code)
|
||||
}
|
||||
Err(_) => Arc::clone(code),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -555,13 +620,15 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
|
|||
let now = self.time_provider.now();
|
||||
|
||||
let code = self.read_plugin_code(&request.filename).await?;
|
||||
let code_string = code.code().to_string();
|
||||
|
||||
let res = plugins::run_test_wal_plugin(now, catalog, query_executor, code, request)
|
||||
.unwrap_or_else(|e| WalPluginTestResponse {
|
||||
log_lines: vec![],
|
||||
database_writes: Default::default(),
|
||||
errors: vec![e.to_string()],
|
||||
});
|
||||
let res =
|
||||
plugins::run_test_wal_plugin(now, catalog, query_executor, code_string, request)
|
||||
.unwrap_or_else(|e| WalPluginTestResponse {
|
||||
log_lines: vec![],
|
||||
database_writes: Default::default(),
|
||||
errors: vec![e.to_string()],
|
||||
});
|
||||
|
||||
return Ok(res);
|
||||
}
|
||||
|
@ -585,15 +652,21 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
|
|||
let now = self.time_provider.now();
|
||||
|
||||
let code = self.read_plugin_code(&request.filename).await?;
|
||||
let code_string = code.code().to_string();
|
||||
|
||||
let res =
|
||||
plugins::run_test_schedule_plugin(now, catalog, query_executor, code, request)
|
||||
.unwrap_or_else(|e| SchedulePluginTestResponse {
|
||||
log_lines: vec![],
|
||||
database_writes: Default::default(),
|
||||
errors: vec![e.to_string()],
|
||||
trigger_time: None,
|
||||
});
|
||||
let res = plugins::run_test_schedule_plugin(
|
||||
now,
|
||||
catalog,
|
||||
query_executor,
|
||||
code_string,
|
||||
request,
|
||||
)
|
||||
.unwrap_or_else(|e| SchedulePluginTestResponse {
|
||||
log_lines: vec![],
|
||||
database_writes: Default::default(),
|
||||
errors: vec![e.to_string()],
|
||||
trigger_time: None,
|
||||
});
|
||||
|
||||
return Ok(res);
|
||||
}
|
||||
|
|
|
@ -1,17 +1,26 @@
|
|||
#[cfg(feature = "system-py")]
|
||||
use crate::PluginCode;
|
||||
#[cfg(feature = "system-py")]
|
||||
use crate::PluginEvent;
|
||||
use data_types::NamespaceName;
|
||||
use hashbrown::HashMap;
|
||||
use influxdb3_catalog::catalog::Catalog;
|
||||
#[cfg(feature = "system-py")]
|
||||
use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse};
|
||||
#[cfg(feature = "system-py")]
|
||||
use influxdb3_internal_api::query_executor::QueryExecutor;
|
||||
use influxdb3_wal::Gen1Duration;
|
||||
#[cfg(feature = "system-py")]
|
||||
use influxdb3_wal::TriggerDefinition;
|
||||
#[cfg(feature = "system-py")]
|
||||
use influxdb3_wal::TriggerSpecificationDefinition;
|
||||
use influxdb3_write::write_buffer;
|
||||
use influxdb3_write::write_buffer::validator::WriteValidator;
|
||||
use influxdb3_write::Precision;
|
||||
#[cfg(feature = "system-py")]
|
||||
use influxdb3_write::WriteBuffer;
|
||||
#[cfg(feature = "system-py")]
|
||||
use iox_time::TimeProvider;
|
||||
use observability_deps::tracing::error;
|
||||
use std::fmt::Debug;
|
||||
#[cfg(feature = "system-py")]
|
||||
|
@ -21,14 +30,6 @@ use thiserror::Error;
|
|||
#[cfg(feature = "system-py")]
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use data_types::NamespaceName;
|
||||
use hashbrown::HashMap;
|
||||
use influxdb3_wal::Gen1Duration;
|
||||
use influxdb3_write::write_buffer::validator::WriteValidator;
|
||||
use influxdb3_write::Precision;
|
||||
#[cfg(feature = "system-py")]
|
||||
use iox_time::TimeProvider;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("invalid database {0}")]
|
||||
|
@ -69,7 +70,7 @@ pub enum Error {
|
|||
#[cfg(feature = "system-py")]
|
||||
pub(crate) fn run_wal_contents_plugin(
|
||||
db_name: String,
|
||||
plugin_code: String,
|
||||
plugin_code: PluginCode,
|
||||
trigger_definition: TriggerDefinition,
|
||||
context: PluginContext,
|
||||
) {
|
||||
|
@ -91,7 +92,7 @@ pub(crate) fn run_wal_contents_plugin(
|
|||
#[cfg(feature = "system-py")]
|
||||
pub(crate) fn run_schedule_plugin(
|
||||
db_name: String,
|
||||
plugin_code: String,
|
||||
plugin_code: PluginCode,
|
||||
trigger_definition: TriggerDefinition,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
context: PluginContext,
|
||||
|
@ -119,7 +120,7 @@ pub(crate) fn run_schedule_plugin(
|
|||
#[cfg(feature = "system-py")]
|
||||
pub(crate) fn run_request_plugin(
|
||||
db_name: String,
|
||||
plugin_code: String,
|
||||
plugin_code: PluginCode,
|
||||
trigger_definition: TriggerDefinition,
|
||||
context: PluginContext,
|
||||
) {
|
||||
|
@ -152,7 +153,7 @@ pub(crate) struct PluginContext {
|
|||
#[derive(Debug)]
|
||||
struct TriggerPlugin {
|
||||
trigger_definition: TriggerDefinition,
|
||||
plugin_code: String,
|
||||
plugin_code: PluginCode,
|
||||
db_name: String,
|
||||
write_buffer: Arc<dyn WriteBuffer>,
|
||||
query_executor: Arc<dyn QueryExecutor>,
|
||||
|
@ -283,7 +284,7 @@ mod python_plugin {
|
|||
return Err(Error::MissingDb);
|
||||
};
|
||||
let result = execute_request_trigger(
|
||||
&self.plugin_code,
|
||||
self.plugin_code.code().as_ref(),
|
||||
Arc::clone(&schema),
|
||||
Arc::clone(&self.query_executor),
|
||||
&self.trigger_definition.trigger_arguments,
|
||||
|
@ -386,7 +387,7 @@ mod python_plugin {
|
|||
};
|
||||
|
||||
let result = execute_python_with_batch(
|
||||
&self.plugin_code,
|
||||
self.plugin_code.code().as_ref(),
|
||||
write_batch,
|
||||
Arc::clone(&schema),
|
||||
Arc::clone(&self.query_executor),
|
||||
|
@ -483,7 +484,7 @@ mod python_plugin {
|
|||
};
|
||||
|
||||
let result = execute_schedule_trigger(
|
||||
&plugin.plugin_code,
|
||||
plugin.plugin_code.code().as_ref(),
|
||||
trigger_time,
|
||||
Arc::clone(&db_schema),
|
||||
Arc::clone(&plugin.query_executor),
|
||||
|
|
Loading…
Reference in New Issue