fix(processing_engine): properly extract data from DictionaryArrays (#26204)
parent
a8e12803b0
commit
d245b2e886
|
@ -1280,6 +1280,55 @@ def process_scheduled_call(influxdb3_local, schedule_time, args=None):
|
|||
);
|
||||
assert_eq!(result["errors"], expected_result["errors"]);
|
||||
}
|
||||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn test_tag_query_behavior() {
|
||||
use crate::server::ConfigProvider;
|
||||
use influxdb3_client::Precision;
|
||||
|
||||
// Create plugin file with a scheduled task
|
||||
let (temp_dir, plugin_path) = create_plugin_in_temp_dir(
|
||||
r#"
|
||||
def process_scheduled_call(influxdb3_local, time, args=None):
|
||||
influxdb3_local.query("SELECT val1,val2,tag1 FROM foo")
|
||||
influxdb3_local.info("successfully queried")"#,
|
||||
);
|
||||
|
||||
let plugin_dir = temp_dir.path().to_str().unwrap();
|
||||
let plugin_name = plugin_path.file_name().unwrap().to_str().unwrap();
|
||||
|
||||
let server = TestServer::configure()
|
||||
.with_plugin_dir(plugin_dir)
|
||||
.spawn()
|
||||
.await;
|
||||
|
||||
// Write some test data
|
||||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
r#"foo,tag1=bar,tag2=blep val1=5,val2=99
|
||||
foo,tag1=bar,tag2=blep val1=10,val2=99
|
||||
foo,tag1=bar,tag2=blep val1=10,val2=199
|
||||
foo,tag1=bar,tag2=mlem val1=10,val2=199
|
||||
foo,tag1=bar,tag2=mloem val1=5,val2=199"#,
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let db_name = "foo";
|
||||
|
||||
let result = server
|
||||
.test_schedule_plugin(db_name, plugin_name, "*/5 * * * * *")
|
||||
.add_input_argument("region=us-east")
|
||||
.run()
|
||||
.expect("Failed to run schedule plugin test");
|
||||
|
||||
debug!(result = ?result, "test schedule plugin");
|
||||
assert_eq!(result["errors"], json!([]));
|
||||
assert_eq!(result["log_lines"], json!(["INFO: successfully queried"]));
|
||||
}
|
||||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn test_schedule_plugin_test_with_strftime() {
|
||||
use crate::server::ConfigProvider;
|
||||
|
|
|
@ -8,8 +8,8 @@ use crate::system_py::CacheId::{Global, GlobalTest, Trigger, TriggerTest};
|
|||
use anyhow::{Context, bail};
|
||||
use arrow_array::types::Int32Type;
|
||||
use arrow_array::{
|
||||
BooleanArray, DictionaryArray, Float64Array, Int64Array, RecordBatch, StringArray,
|
||||
TimestampNanosecondArray, UInt64Array,
|
||||
Array, BooleanArray, DictionaryArray, Float64Array, Int32Array, Int64Array, RecordBatch,
|
||||
StringArray, TimestampNanosecondArray, UInt64Array,
|
||||
};
|
||||
use arrow_schema::DataType;
|
||||
use bytes::Bytes;
|
||||
|
@ -278,13 +278,19 @@ impl PyPluginCallApi {
|
|||
.downcast_ref::<DictionaryArray<Int32Type>>()
|
||||
.expect("unexpected datatype");
|
||||
|
||||
let keys = col
|
||||
.keys()
|
||||
.as_any()
|
||||
.downcast_ref::<Int32Array>()
|
||||
.expect("unexpected datatype");
|
||||
|
||||
let values = col.values();
|
||||
let values = values
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.expect("unexpected datatype");
|
||||
|
||||
let val = values.value(row_idx).to_string();
|
||||
let val = values.value(keys.value(row_idx) as usize).to_string();
|
||||
row.set_item(field_name, val)?;
|
||||
}
|
||||
_ => {
|
||||
|
|
Loading…
Reference in New Issue