feat: load plugins from our github plugin repo (#25861)

Adds the ability to load a plugin from our public github repo here: https://github.com/influxdata/influxdb3_pluginshttps://github.com/influxdata/influxdb3_plugins

If the plugin filename is specified as `gh:examples/wal_plugin` it will pull from the github repo at `<dir>/<name>/<name>.py`.

Closes #25836
pull/25852/head
Paul Dix 2025-01-17 15:44:42 -05:00 committed by GitHub
parent 8ea6d65f8e
commit b39b48a2cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 89 additions and 3 deletions

1
Cargo.lock generated
View File

@ -2991,6 +2991,7 @@ dependencies = [
"observability_deps",
"parquet_file",
"pyo3",
"reqwest 0.11.27",
"tempfile",
"thiserror 1.0.69",
"tokio",

View File

@ -1286,3 +1286,64 @@ def process_writes(influxdb3_local, table_batches, args=None):
);
}
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_load_wal_plugin_from_gh() {
use crate::ConfigProvider;
use influxdb3_client::Precision;
let plugin_dir = TempDir::new().unwrap();
let server = TestServer::configure()
.with_plugin_dir(plugin_dir.path().to_str().unwrap())
.spawn()
.await;
let server_addr = server.client_addr();
server
.write_lp_to_db(
"foo",
"cpu,host=s1,region=us-east usage=0.9\n\
cpu,host=s2,region=us-east usage=0.89\n\
cpu,host=s1,region=us-east usage=0.85",
Precision::Nanosecond,
)
.await
.unwrap();
let db_name = "foo";
// this will pull from https://github.com/influxdata/influxdb3_plugins/blob/main/examples/wal_plugin/wal_plugin.py
let plugin_name = "gh:examples/wal_plugin";
// Run the test to make sure it'll load from GH
let result = run_with_confirmation(&[
"test",
"wal_plugin",
"--database",
db_name,
"--host",
&server_addr,
"--lp",
"test_input,tag1=tag1_value,tag2=tag2_value field1=1i 500",
plugin_name,
]);
debug!(result = ?result, "test wal plugin");
let res = serde_json::from_str::<serde_json::Value>(&result).unwrap();
let expected_result = r#"{
"log_lines": [
"INFO: wal_plugin.py done"
],
"database_writes": {
"foo": [
"write_reports,table_name=test_input row_count=1i"
]
},
"errors": []
}"#;
let expected_result = serde_json::from_str::<serde_json::Value>(expected_result).unwrap();
assert_eq!(res, expected_result);
}

View File

@ -21,6 +21,7 @@ influxdb3_py_api = { path = "../influxdb3_py_api" }
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_write = { path = "../influxdb3_write" }
observability_deps.workspace = true
reqwest.workspace = true
thiserror.workspace = true
tokio.workspace = true

View File

@ -117,7 +117,30 @@ impl ProcessingEngineManagerImpl {
}
}
pub fn read_plugin_code(&self, name: &str) -> Result<String, plugins::Error> {
pub async fn read_plugin_code(&self, name: &str) -> Result<String, plugins::Error> {
// if the name starts with gh: then we need to get it from the public github repo at https://github.com/influxdata/influxdb3_plugins/tree/main
if name.starts_with("gh:") {
let plugin_path = name.strip_prefix("gh:").unwrap();
// the filename should be the last part of the name after the last /
let plugin_name = plugin_path
.split('/')
.last()
.context("plugin name for github plugins must be <dir>/<name>")?;
let url = format!(
"https://raw.githubusercontent.com/influxdata/influxdb3_plugins/main/{}/{}.py",
plugin_path, plugin_name
);
let resp = reqwest::get(&url)
.await
.context("error getting plugin from github repo")?;
let resp_body = resp
.text()
.await
.context("error reading plugin from github repo")?;
return Ok(resp_body);
}
// otherwise we assume it is a local file
let plugin_dir = self.plugin_dir.clone().context("plugin dir not set")?;
let path = plugin_dir.join(name);
Ok(std::fs::read_to_string(path)?)
@ -315,7 +338,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
write_buffer,
query_executor,
};
let plugin_code = self.read_plugin_code(&trigger.plugin_file_name)?;
let plugin_code = self.read_plugin_code(&trigger.plugin_file_name).await?;
plugins::run_plugin(db_name.to_string(), plugin_code, trigger, plugin_context);
}
@ -449,7 +472,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
let catalog = Arc::new(Catalog::from_inner(self.catalog.clone_inner()));
let now = self.time_provider.now();
let code = self.read_plugin_code(&request.filename)?;
let code = self.read_plugin_code(&request.filename).await?;
let res = plugins::run_test_wal_plugin(now, catalog, query_executor, code, request)
.unwrap_or_else(|e| WalPluginTestResponse {