feat: Updates to plugin API (#25799)
* Add uint64_field to LineBuilder * Add bool_field to LineBuilder * Change query_rows to query in Python API * Update error output for test wal_pluginparameterize_docker_image_name
parent
c5a0fa736e
commit
ebf78aa6c9
|
@ -76,7 +76,7 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
|
||||||
None => {
|
None => {
|
||||||
let file_path = plugin_config
|
let file_path = plugin_config
|
||||||
.input_file
|
.input_file
|
||||||
.context("either input_lp or input_file must be provided")?;
|
.context("either --lp or --file must be provided")?;
|
||||||
std::fs::read_to_string(file_path).context("unable to read input file")?
|
std::fs::read_to_string(file_path).context("unable to read input file")?
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -911,7 +911,7 @@ def process_writes(influxdb3_local, table_batches, args=None):
|
||||||
influxdb3_local.info("arg1: " + args["arg1"])
|
influxdb3_local.info("arg1: " + args["arg1"])
|
||||||
|
|
||||||
query_params = {"host": args["host"]}
|
query_params = {"host": args["host"]}
|
||||||
query_result = influxdb3_local.query_rows("SELECT * FROM cpu where host = $host", query_params)
|
query_result = influxdb3_local.query("SELECT * FROM cpu where host = $host", query_params)
|
||||||
influxdb3_local.info("query result: " + str(query_result))
|
influxdb3_local.info("query result: " + str(query_result))
|
||||||
|
|
||||||
for table_batch in table_batches:
|
for table_batch in table_batches:
|
||||||
|
|
|
@ -470,7 +470,9 @@ def process_writes(influxdb3_local, table_batches, args=None):
|
||||||
|
|
||||||
cpu_valid = LineBuilder("cpu")\
|
cpu_valid = LineBuilder("cpu")\
|
||||||
.tag("host", "A")\
|
.tag("host", "A")\
|
||||||
.int64_field("f1", 10)
|
.int64_field("f1", 10)\
|
||||||
|
.uint64_field("f2", 20)\
|
||||||
|
.bool_field("f3", True)
|
||||||
influxdb3_local.write_to_db("foodb", cpu_valid)
|
influxdb3_local.write_to_db("foodb", cpu_valid)
|
||||||
|
|
||||||
cpu_invalid = LineBuilder("cpu")\
|
cpu_invalid = LineBuilder("cpu")\
|
||||||
|
@ -509,7 +511,7 @@ def process_writes(influxdb3_local, table_batches, args=None):
|
||||||
|
|
||||||
// the lines should still come through in the output because that's what Python sent
|
// the lines should still come through in the output because that's what Python sent
|
||||||
let expected_foodb_lines = vec![
|
let expected_foodb_lines = vec![
|
||||||
"cpu,host=A f1=10i".to_string(),
|
"cpu,host=A f1=10i,f2=20u,f3=t".to_string(),
|
||||||
"cpu,host=A f1=\"not_an_int\"".to_string(),
|
"cpu,host=A f1=\"not_an_int\"".to_string(),
|
||||||
];
|
];
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
|
|
@ -116,7 +116,7 @@ impl PyPluginCallApi {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pyo3(signature = (query, args=None))]
|
#[pyo3(signature = (query, args=None))]
|
||||||
fn query_rows(
|
fn query(
|
||||||
&self,
|
&self,
|
||||||
query: String,
|
query: String,
|
||||||
args: Option<std::collections::HashMap<String, String>>,
|
args: Option<std::collections::HashMap<String, String>>,
|
||||||
|
@ -282,6 +282,14 @@ class LineBuilder:
|
||||||
self.tags[key] = str(value)
|
self.tags[key] = str(value)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
def uint64_field(self, key: str, value: int) -> 'LineBuilder':
|
||||||
|
"""Add an unsigned integer field to the line protocol."""
|
||||||
|
self._validate_key(key, "field")
|
||||||
|
if value < 0:
|
||||||
|
raise ValueError(f"uint64 field '{key}' cannot be negative")
|
||||||
|
self.fields[key] = f"{value}u"
|
||||||
|
return self
|
||||||
|
|
||||||
def int64_field(self, key: str, value: int) -> 'LineBuilder':
|
def int64_field(self, key: str, value: int) -> 'LineBuilder':
|
||||||
"""Add an integer field to the line protocol."""
|
"""Add an integer field to the line protocol."""
|
||||||
self._validate_key(key, "field")
|
self._validate_key(key, "field")
|
||||||
|
@ -303,6 +311,12 @@ class LineBuilder:
|
||||||
self.fields[key] = f'"{escaped_value}"'
|
self.fields[key] = f'"{escaped_value}"'
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
def bool_field(self, key: str, value: bool) -> 'LineBuilder':
|
||||||
|
"""Add a boolean field to the line protocol."""
|
||||||
|
self._validate_key(key, "field")
|
||||||
|
self.fields[key] = 't' if value else 'f'
|
||||||
|
return self
|
||||||
|
|
||||||
def time_ns(self, timestamp_ns: int) -> 'LineBuilder':
|
def time_ns(self, timestamp_ns: int) -> 'LineBuilder':
|
||||||
"""Set the timestamp in nanoseconds."""
|
"""Set the timestamp in nanoseconds."""
|
||||||
self._timestamp_ns = timestamp_ns
|
self._timestamp_ns = timestamp_ns
|
||||||
|
|
Loading…
Reference in New Issue