feat: remove legacy write service from influxdb_iox_client (#3043)
* feat: remove legacy write service from influxdb_iox_client * chore: review feedback Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
4d456dc92b
commit
88868e7496
|
@ -1684,6 +1684,9 @@ dependencies = [
|
|||
"client_util",
|
||||
"futures-util",
|
||||
"generated_types",
|
||||
"mutable_batch",
|
||||
"mutable_batch_lp",
|
||||
"mutable_batch_pb",
|
||||
"prost",
|
||||
"rand",
|
||||
"serde",
|
||||
|
|
|
@ -12,7 +12,7 @@ data_types = { path = "../data_types" }
|
|||
entry = { path = "../entry" }
|
||||
generated_types = { path = "../generated_types" }
|
||||
|
||||
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] }
|
||||
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] }
|
||||
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
|
||||
internal_types = { path = "../internal_types" }
|
||||
iox_object_store = { path = "../iox_object_store" }
|
||||
|
@ -99,7 +99,6 @@ arrow_util = { path = "../arrow_util" }
|
|||
entry = { path = "../entry" }
|
||||
influxdb2_client = { path = "../influxdb2_client" }
|
||||
influxdb_storage_client = { path = "../influxdb_storage_client" }
|
||||
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight"] }
|
||||
test_helpers = { path = "../test_helpers" }
|
||||
parking_lot = "0.11.2"
|
||||
write_buffer = { path = "../write_buffer" }
|
||||
|
|
|
@ -15,6 +15,7 @@ use influxdb_iox_client::{
|
|||
use std::{fs::File, io::Read, num::NonZeroU64, path::PathBuf, str::FromStr, time::Duration};
|
||||
use structopt::StructOpt;
|
||||
use thiserror::Error;
|
||||
use time::TimeProvider;
|
||||
use uuid::Uuid;
|
||||
|
||||
mod chunk;
|
||||
|
@ -303,7 +304,8 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
|||
source: e,
|
||||
})?;
|
||||
|
||||
let lines_written = client.write(write.name, lp_data).await?;
|
||||
let default_time = time::SystemProvider::new().now().timestamp_nanos();
|
||||
let lines_written = client.write_lp(write.name, lp_data, default_time).await?;
|
||||
|
||||
println!("{} Lines OK", lines_written);
|
||||
}
|
||||
|
|
|
@ -482,7 +482,7 @@ mod tests {
|
|||
|
||||
let mut write = influxdb_iox_client::write::Client::new(conn.clone());
|
||||
write
|
||||
.write(db_info.db_name(), "cpu,tag0=foo val=1 100\n")
|
||||
.write_lp(db_info.db_name(), "cpu,tag0=foo val=1 100\n", 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ async fn test_delete() {
|
|||
];
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("write succeded");
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ async fn test_querying_deleted_database() {
|
|||
.await
|
||||
.expect("create database failed");
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, "cpu,region=west user=12.3 100")
|
||||
.write_lp(&db_name, "cpu,region=west user=12.3 100", 0)
|
||||
.await
|
||||
.expect("cannot write");
|
||||
|
||||
|
@ -60,7 +60,7 @@ async fn test_querying_deleted_database() {
|
|||
.await
|
||||
.expect("create database failed");
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, "cpu,region=east user=99.9 200")
|
||||
.write_lp(&db_name, "cpu,region=east user=99.9 200", 0)
|
||||
.await
|
||||
.expect("cannot write");
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ async fn test_serving_readiness_database() {
|
|||
.set_serving_readiness(false)
|
||||
.await
|
||||
.unwrap();
|
||||
let err = write_client.write(name, lp_data).await.unwrap_err();
|
||||
let err = write_client.write_lp(name, lp_data, 0).await.unwrap_err();
|
||||
assert!(
|
||||
matches!(&err, WriteError::ServerError(status) if status.code() == Code::Unavailable),
|
||||
"{}",
|
||||
|
@ -43,7 +43,7 @@ async fn test_serving_readiness_database() {
|
|||
|
||||
deployment_client.set_serving_readiness(true).await.unwrap();
|
||||
assert!(deployment_client.get_serving_readiness().await.unwrap());
|
||||
write_client.write(name, lp_data).await.unwrap();
|
||||
write_client.write_lp(name, lp_data, 0).await.unwrap();
|
||||
}
|
||||
|
||||
// TODO(marco): add `test_serving_readiness_router` once we have some other API that we could use for testing
|
||||
|
|
|
@ -28,7 +28,7 @@ async fn test_mub_freeze() {
|
|||
.collect();
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.iter().join("\n"))
|
||||
.write_lp(&db_name, lp_lines.iter().join("\n"), 0)
|
||||
.await
|
||||
.expect("successful write");
|
||||
assert_eq!(num_lines_written, 20);
|
||||
|
@ -38,7 +38,7 @@ async fn test_mub_freeze() {
|
|||
assert_eq!(chunks[0].storage, ChunkStorage::ClosedMutableBuffer);
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.iter().take(10).join("\n"))
|
||||
.write_lp(&db_name, lp_lines.iter().take(10).join("\n"), 0)
|
||||
.await
|
||||
.expect("successful write");
|
||||
assert_eq!(num_lines_written, 10);
|
||||
|
@ -51,7 +51,7 @@ async fn test_mub_freeze() {
|
|||
assert_eq!(chunks[1].storage, ChunkStorage::ClosedMutableBuffer);
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.iter().take(10).join("\n"))
|
||||
.write_lp(&db_name, lp_lines.iter().take(10).join("\n"), 0)
|
||||
.await
|
||||
.expect("successful write");
|
||||
assert_eq!(num_lines_written, 10);
|
||||
|
|
|
@ -51,7 +51,7 @@ async fn test_serving_readiness() {
|
|||
.expect("create database failed");
|
||||
|
||||
mgmt_client.set_serving_readiness(false).await.unwrap();
|
||||
let err = write_client.write(name, lp_data).await.unwrap_err();
|
||||
let err = write_client.write_lp(name, lp_data, 0).await.unwrap_err();
|
||||
assert!(
|
||||
matches!(&err, WriteError::ServerError(status) if status.code() == Code::Unavailable),
|
||||
"{}",
|
||||
|
@ -59,7 +59,7 @@ async fn test_serving_readiness() {
|
|||
);
|
||||
|
||||
mgmt_client.set_serving_readiness(true).await.unwrap();
|
||||
write_client.write(name, lp_data).await.unwrap();
|
||||
write_client.write_lp(name, lp_data, 0).await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -542,7 +542,7 @@ async fn test_chunk_get() {
|
|||
];
|
||||
|
||||
write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("write succeded");
|
||||
|
||||
|
@ -711,7 +711,7 @@ async fn test_partition_get_error() {
|
|||
vec!["processes,host=foo running=4i,sleeping=514i,total=519i 1591894310000000000"];
|
||||
|
||||
write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("write succeded");
|
||||
|
||||
|
@ -739,7 +739,7 @@ async fn test_list_partition_chunks() {
|
|||
];
|
||||
|
||||
write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("write succeded");
|
||||
|
||||
|
@ -802,7 +802,7 @@ async fn test_new_partition_chunk() {
|
|||
let lp_lines = vec!["cpu,region=west user=23.2 100"];
|
||||
|
||||
write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("write succeded");
|
||||
|
||||
|
@ -826,7 +826,7 @@ async fn test_new_partition_chunk() {
|
|||
let lp_lines = vec!["cpu,region=west user=21.0 150"];
|
||||
|
||||
write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("write succeeded");
|
||||
|
||||
|
@ -906,7 +906,7 @@ async fn test_close_partition_chunk() {
|
|||
let lp_lines = vec!["cpu,region=west user=23.2 100"];
|
||||
|
||||
write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("write succeded");
|
||||
|
||||
|
@ -998,7 +998,7 @@ async fn test_chunk_lifecycle() {
|
|||
let lp_lines = vec!["cpu,region=west user=23.2 100"];
|
||||
|
||||
write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("write succeded");
|
||||
|
||||
|
@ -1305,7 +1305,7 @@ async fn test_unload_read_buffer() {
|
|||
.collect();
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("successful write");
|
||||
assert_eq!(num_lines_written, 1000);
|
||||
|
@ -1353,7 +1353,10 @@ async fn test_chunk_access_time() {
|
|||
.build(fixture.grpc_channel())
|
||||
.await;
|
||||
|
||||
write_client.write(&db_name, "cpu foo=1 10").await.unwrap();
|
||||
write_client
|
||||
.write_lp(&db_name, "cpu foo=1 10", 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let to_datetime = |a: Option<&generated_types::google::protobuf::Timestamp>| -> DateTime<Utc> {
|
||||
a.unwrap().clone().try_into().unwrap()
|
||||
|
@ -1381,7 +1384,10 @@ async fn test_chunk_access_time() {
|
|||
assert_eq!(chunks.len(), 1);
|
||||
let t2 = to_datetime(chunks[0].time_of_last_access.as_ref());
|
||||
|
||||
write_client.write(&db_name, "cpu foo=1 20").await.unwrap();
|
||||
write_client
|
||||
.write_lp(&db_name, "cpu foo=1 20", 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let chunks = management_client.list_chunks(&db_name).await.unwrap();
|
||||
assert_eq!(chunks.len(), 1);
|
||||
|
@ -1424,7 +1430,7 @@ async fn test_drop_partition() {
|
|||
.collect();
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("successful write");
|
||||
assert_eq!(num_lines_written, 1000);
|
||||
|
@ -1476,7 +1482,7 @@ async fn test_drop_partition_error() {
|
|||
.collect();
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("successful write");
|
||||
assert_eq!(num_lines_written, 1000);
|
||||
|
@ -1532,7 +1538,7 @@ async fn test_delete() {
|
|||
];
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("write succeded");
|
||||
|
||||
|
@ -1665,7 +1671,7 @@ async fn test_persist_partition() {
|
|||
.await;
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, "data foo=1 10")
|
||||
.write_lp(&db_name, "data foo=1 10", 0)
|
||||
.await
|
||||
.expect("successful write");
|
||||
assert_eq!(num_lines_written, 1);
|
||||
|
@ -1721,7 +1727,7 @@ async fn test_persist_partition_error() {
|
|||
.await;
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, "data foo=1 10")
|
||||
.write_lp(&db_name, "data foo=1 10", 0)
|
||||
.await
|
||||
.expect("successful write");
|
||||
assert_eq!(num_lines_written, 1);
|
||||
|
|
|
@ -30,7 +30,7 @@ async fn test_chunk_is_persisted_automatically() {
|
|||
.collect();
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("successful write");
|
||||
assert_eq!(num_lines_written, 1000);
|
||||
|
@ -68,7 +68,7 @@ async fn write_data(
|
|||
// Writing the same data multiple times should be compacted away
|
||||
for _ in 0..=num_duplicates {
|
||||
let num_lines_written = write_client
|
||||
.write(db_name, payload)
|
||||
.write_lp(db_name, payload, 0)
|
||||
.await
|
||||
.expect("successful write");
|
||||
assert_eq!(num_lines_written, payload_size as usize);
|
||||
|
@ -257,7 +257,7 @@ async fn create_readbuffer_chunk(fixture: &ServerFixture, db_name: &str) -> Chun
|
|||
let lp_lines = vec!["cpu,region=west user=23.2 100"];
|
||||
|
||||
write_client
|
||||
.write(db_name, lp_lines.join("\n"))
|
||||
.write_lp(db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("write succeded");
|
||||
|
||||
|
|
|
@ -490,7 +490,7 @@ pub async fn create_two_partition_database(db_name: impl Into<String>, channel:
|
|||
];
|
||||
|
||||
write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("write succeded");
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ async fn test_operations() {
|
|||
let lp_lines = vec!["cpu,region=west user=23.2 100"];
|
||||
|
||||
write_client
|
||||
.write(&db_name1, lp_lines.join("\n"))
|
||||
.write_lp(&db_name1, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("write succeded");
|
||||
|
||||
|
|
|
@ -8,16 +8,11 @@ use crate::{
|
|||
|
||||
use super::scenario::{create_readable_database, rand_name};
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
use entry::{
|
||||
lines_to_sharded_entries,
|
||||
test_helpers::{partitioner, sharder},
|
||||
};
|
||||
use generated_types::influxdata::iox::management::v1::database_rules::RoutingRules;
|
||||
use generated_types::influxdata::iox::management::v1::{
|
||||
node_group::Node, sink, HashRing, Matcher, MatcherToShard, NodeGroup, RoutingConfig,
|
||||
ShardConfig, Sink,
|
||||
};
|
||||
use influxdb_line_protocol::parse_lines;
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -46,7 +41,7 @@ async fn test_write() {
|
|||
];
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("cannot write");
|
||||
|
||||
|
@ -54,19 +49,19 @@ async fn test_write() {
|
|||
|
||||
// ---- test bad data ----
|
||||
let err = write_client
|
||||
.write(&db_name, "XXX")
|
||||
.write_lp(&db_name, "XXX", 0)
|
||||
.await
|
||||
.expect_err("expected write to fail");
|
||||
|
||||
assert_contains!(
|
||||
err.to_string(),
|
||||
r#"Client specified an invalid argument: Violation for field "lp_data": Invalid Line Protocol: error parsing line 1: A generic parsing error occurred"#
|
||||
r#"Error converting lines: error parsing line 1: A generic parsing error occurred: TakeWhile1"#
|
||||
);
|
||||
assert!(matches!(dbg!(err), WriteError::InvalidArgument(_)));
|
||||
assert!(matches!(dbg!(err), WriteError::LinesConversion(_)));
|
||||
|
||||
// ---- test non existent database ----
|
||||
let err = write_client
|
||||
.write("Non_existent_database", lp_lines.join("\n"))
|
||||
.write_lp("Non_existent_database", lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect_err("expected write to fail");
|
||||
|
||||
|
@ -83,7 +78,10 @@ async fn test_write() {
|
|||
let lp_lines: Vec<_> = (0..1_000)
|
||||
.map(|j| format!("flood,tag1={},tag2={} x={},y={} 0", i, j, i, j))
|
||||
.collect();
|
||||
if let Err(err) = write_client.write(&db_name, lp_lines.join("\n")).await {
|
||||
if let Err(err) = write_client
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
{
|
||||
maybe_err = Some(err);
|
||||
break;
|
||||
}
|
||||
|
@ -100,48 +98,6 @@ async fn test_write() {
|
|||
// useless. Don't append any tests after the "hard limit" test!
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_entry() {
|
||||
let fixture = ServerFixture::create_shared(ServerType::Database).await;
|
||||
let mut write_client = fixture.write_client();
|
||||
|
||||
let db_name = rand_name();
|
||||
create_readable_database(&db_name, fixture.grpc_channel()).await;
|
||||
|
||||
let lp_data = vec!["cpu bar=1 10", "cpu bar=2 20"].join("\n");
|
||||
|
||||
let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect();
|
||||
let default_time = 456;
|
||||
let sharded_entries =
|
||||
lines_to_sharded_entries(&lines, default_time, sharder(1).as_ref(), &partitioner(1))
|
||||
.unwrap();
|
||||
|
||||
let entry = sharded_entries.into_iter().next().unwrap().entry;
|
||||
|
||||
write_client.write_entry(&db_name, entry).await.unwrap();
|
||||
|
||||
let mut query_results = fixture
|
||||
.flight_client()
|
||||
.perform_query(&db_name, "select * from cpu")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut batches = Vec::new();
|
||||
while let Some(data) = query_results.next().await.unwrap() {
|
||||
batches.push(data);
|
||||
}
|
||||
|
||||
let expected = vec![
|
||||
"+-----+--------------------------------+",
|
||||
"| bar | time |",
|
||||
"+-----+--------------------------------+",
|
||||
"| 1 | 1970-01-01T00:00:00.000000010Z |",
|
||||
"| 2 | 1970-01-01T00:00:00.000000020Z |",
|
||||
"+-----+--------------------------------+",
|
||||
];
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_routed() {
|
||||
const TEST_ROUTER_ID: u32 = 1;
|
||||
|
@ -288,7 +244,7 @@ async fn test_write_routed() {
|
|||
];
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("cannot write");
|
||||
|
||||
|
@ -430,7 +386,7 @@ async fn test_write_routed_errors() {
|
|||
let mut write_client = router.write_client();
|
||||
let lp_lines = vec!["cpu bar=1 100", "cpu bar=2 200"];
|
||||
let err = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
|
@ -495,14 +451,14 @@ async fn test_write_dev_null() {
|
|||
let mut write_client = router.write_client();
|
||||
let lp_lines = vec!["cpu bar=1 100", "cpu bar=2 200"];
|
||||
write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("dev null eats them all");
|
||||
|
||||
// Rows not matching that shard won't be send to "/dev/null".
|
||||
let lp_lines = vec!["mem bar=1 1", "mem bar=2 2"];
|
||||
let err = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
|
@ -613,7 +569,7 @@ async fn test_write_routed_no_shard() {
|
|||
|
||||
for (&ref db_name, &ref line) in &[(&db_name_1, line_1), (&db_name_2, line_2)] {
|
||||
let num_lines_written = write_client
|
||||
.write(db_name, line)
|
||||
.write_lp(db_name, line, 0)
|
||||
.await
|
||||
.expect("cannot write");
|
||||
assert_eq!(num_lines_written, 1);
|
||||
|
@ -709,12 +665,12 @@ async fn test_write_schema_mismatch() {
|
|||
create_readable_database(&db_name, fixture.grpc_channel()).await;
|
||||
|
||||
write_client
|
||||
.write(&db_name, "table field=1i 10")
|
||||
.write_lp(&db_name, "table field=1i 10", 0)
|
||||
.await
|
||||
.expect("cannot write");
|
||||
|
||||
let err = write_client
|
||||
.write(&db_name, "table field=1.1 10")
|
||||
.write_lp(&db_name, "table field=1.1 10", 0)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_contains!(err.to_string(), "Table batch has mismatching schema");
|
||||
|
|
|
@ -56,7 +56,7 @@ async fn writes_go_to_write_buffer() {
|
|||
];
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("cannot write");
|
||||
assert_eq!(num_lines_written, 3);
|
||||
|
@ -106,7 +106,7 @@ async fn writes_go_to_write_buffer_whitelist() {
|
|||
];
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("cannot write");
|
||||
assert_eq!(num_lines_written, 4);
|
||||
|
@ -249,7 +249,7 @@ async fn cant_write_to_db_reading_from_write_buffer() {
|
|||
// Writing to this database is an error; all data comes from write buffer
|
||||
let mut write_client = server.write_client();
|
||||
let err = write_client
|
||||
.write(&db_name, "temp,region=south color=1")
|
||||
.write_lp(&db_name, "temp,region=south color=1", 0)
|
||||
.await
|
||||
.expect_err("expected write to fail");
|
||||
|
||||
|
@ -352,7 +352,7 @@ pub async fn test_cross_write_buffer_tracing() {
|
|||
"disk,region=east bytes=99i 200",
|
||||
];
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
||||
.await
|
||||
.expect("cannot write");
|
||||
assert_eq!(num_lines_written, 3);
|
||||
|
|
|
@ -7,6 +7,7 @@ edition = "2021"
|
|||
[features]
|
||||
flight = ["arrow", "arrow-flight", "arrow_util", "serde/derive", "serde_json", "futures-util"]
|
||||
format = ["arrow", "arrow_util"]
|
||||
write_lp = ["mutable_batch", "mutable_batch_lp", "mutable_batch_pb"]
|
||||
|
||||
[dependencies]
|
||||
# Workspace dependencies, in alphabetical order
|
||||
|
@ -19,6 +20,9 @@ arrow = { version = "6.0", optional = true }
|
|||
arrow-flight = { version = "6.0", optional = true }
|
||||
bytes = "1.0"
|
||||
futures-util = { version = "0.3.1", optional = true }
|
||||
mutable_batch = { path = "../mutable_batch", optional = true }
|
||||
mutable_batch_lp = { path = "../mutable_batch_lp", optional = true }
|
||||
mutable_batch_pb = { path = "../mutable_batch_pb", optional = true }
|
||||
prost = "0.8"
|
||||
rand = "0.8.3"
|
||||
serde = "1.0.128"
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
use bytes::Bytes;
|
||||
use thiserror::Error;
|
||||
|
||||
use generated_types::influxdata::iox::write::v1 as write;
|
||||
use generated_types::influxdata::iox::write::v1::write_service_client::WriteServiceClient;
|
||||
use generated_types::influxdata::pbdata::v1 as write_pb;
|
||||
use generated_types::influxdata::pbdata::v1::write_service_client::WriteServiceClient as PBWriteServiceClient;
|
||||
/// Re-export generated_types
|
||||
pub mod generated_types {
|
||||
pub use generated_types::influxdata::pbdata::v1::*;
|
||||
}
|
||||
|
||||
use self::generated_types::write_service_client::WriteServiceClient;
|
||||
|
||||
use crate::connection::Connection;
|
||||
|
||||
|
@ -18,6 +19,11 @@ pub enum WriteError {
|
|||
/// Server returned an invalid argument error
|
||||
#[error("Invalid argument: {}: {}", .0.code(), .0.message())]
|
||||
InvalidArgument(tonic::Status),
|
||||
|
||||
#[cfg(feature = "write_lp")]
|
||||
/// Error converting lines
|
||||
#[error("Error converting lines: {}", .0)]
|
||||
LinesConversion(#[from] mutable_batch_lp::Error),
|
||||
}
|
||||
|
||||
/// An IOx Write API client.
|
||||
|
@ -39,7 +45,7 @@ pub enum WriteError {
|
|||
///
|
||||
/// // write a line of line procol data
|
||||
/// client
|
||||
/// .write("bananas", "cpu,region=west user=23.2 100")
|
||||
/// .write_lp("bananas", "cpu,region=west user=23.2 100",0)
|
||||
/// .await
|
||||
/// .expect("failed to create database");
|
||||
/// # }
|
||||
|
@ -47,66 +53,52 @@ pub enum WriteError {
|
|||
#[derive(Debug, Clone)]
|
||||
pub struct Client {
|
||||
inner: WriteServiceClient<Connection>,
|
||||
inner_pb: PBWriteServiceClient<Connection>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
/// Creates a new client with the provided connection
|
||||
pub fn new(channel: Connection) -> Self {
|
||||
Self {
|
||||
inner: WriteServiceClient::new(channel.clone()),
|
||||
inner_pb: PBWriteServiceClient::new(channel),
|
||||
inner: WriteServiceClient::new(channel),
|
||||
}
|
||||
}
|
||||
|
||||
/// Write the [LineProtocol] formatted data in `lp_data` to
|
||||
/// database `name`. Returns the number of lines which were parsed
|
||||
/// and written to the database
|
||||
/// database `name`. Lines without a timestamp will be assigned `default_time`
|
||||
///
|
||||
/// Returns the number of lines which were parsed and written to the database
|
||||
///
|
||||
/// [LineProtocol]: https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format
|
||||
pub async fn write(
|
||||
#[cfg(feature = "write_lp")]
|
||||
pub async fn write_lp(
|
||||
&mut self,
|
||||
db_name: impl Into<String> + Send,
|
||||
lp_data: impl Into<String> + Send,
|
||||
db_name: impl AsRef<str> + Send,
|
||||
lp_data: impl AsRef<str> + Send,
|
||||
default_time: i64,
|
||||
) -> Result<usize, WriteError> {
|
||||
let db_name = db_name.into();
|
||||
let lp_data = lp_data.into();
|
||||
let response = self
|
||||
.inner
|
||||
.write(write::WriteRequest { db_name, lp_data })
|
||||
.await
|
||||
.map_err(Self::map_err)?;
|
||||
let tables = mutable_batch_lp::lines_to_batches(lp_data.as_ref(), default_time)?;
|
||||
let meta = mutable_batch::WriteMeta::unsequenced(None);
|
||||
let write = mutable_batch::DbWrite::new(tables, meta);
|
||||
let lines = write.tables().map(|(_, table)| table.rows()).sum();
|
||||
|
||||
Ok(response.into_inner().lines_written as usize)
|
||||
}
|
||||
let database_batch = mutable_batch_pb::encode::encode_write(db_name.as_ref(), &write);
|
||||
|
||||
/// Write an [Entry] to database `name`.
|
||||
///
|
||||
/// An Entry unit of write payload encoded as Flatbuffer structure
|
||||
/// and passed as a bytes field in the gRPC protobuf API.
|
||||
///
|
||||
/// [Entry]: https://github.com/influxdata/influxdb_iox/blob/main/entry/src/entry.fbs
|
||||
pub async fn write_entry(
|
||||
&mut self,
|
||||
db_name: impl Into<String> + Send,
|
||||
entry: impl Into<Bytes> + Send,
|
||||
) -> Result<(), WriteError> {
|
||||
let db_name = db_name.into();
|
||||
let entry = entry.into();
|
||||
self.inner
|
||||
.write_entry(write::WriteEntryRequest { db_name, entry })
|
||||
.write(generated_types::WriteRequest {
|
||||
database_batch: Some(database_batch),
|
||||
})
|
||||
.await
|
||||
.map_err(Self::map_err)?;
|
||||
|
||||
Ok(())
|
||||
Ok(lines)
|
||||
}
|
||||
|
||||
/// Write a protobuf batch.
|
||||
pub async fn write_pb(
|
||||
&mut self,
|
||||
write_request: write_pb::WriteRequest,
|
||||
write_request: generated_types::WriteRequest,
|
||||
) -> Result<(), WriteError> {
|
||||
self.inner_pb
|
||||
self.inner
|
||||
.write(write_request)
|
||||
.await
|
||||
.map_err(Self::map_err)?;
|
||||
|
|
|
@ -11,10 +11,8 @@
|
|||
)]
|
||||
|
||||
use clap::{App, Arg};
|
||||
use generated_types::influxdata::iox::{
|
||||
management::v1::{self as management, database_rules::*, lifecycle_rules::*, *},
|
||||
write_buffer::v1::*,
|
||||
};
|
||||
use influxdb_iox_client::management::generated_types::*;
|
||||
use influxdb_iox_client::write::generated_types::*;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
|
@ -85,9 +83,9 @@ Examples:
|
|||
..Default::default()
|
||||
}),
|
||||
worker_cleanup_avg_sleep: None,
|
||||
routing_rules: Some(RoutingRules::RoutingConfig(RoutingConfig {
|
||||
sink: Some(management::Sink {
|
||||
sink: Some(management::sink::Sink::Kafka(KafkaProducer {})),
|
||||
routing_rules: Some(database_rules::RoutingRules::RoutingConfig(RoutingConfig {
|
||||
sink: Some(Sink {
|
||||
sink: Some(sink::Sink::Kafka(KafkaProducer {})),
|
||||
}),
|
||||
})),
|
||||
write_buffer_connection: Some(WriteBufferConnection {
|
||||
|
@ -110,15 +108,17 @@ Examples:
|
|||
buffer_size_soft: 1024 * 1024 * 1024,
|
||||
buffer_size_hard: 1024 * 1024 * 1024 * 2,
|
||||
worker_backoff_millis: 100,
|
||||
max_active_compactions_cfg: Some(MaxActiveCompactionsCfg::MaxActiveCompactions(1)),
|
||||
max_active_compactions_cfg: Some(
|
||||
lifecycle_rules::MaxActiveCompactionsCfg::MaxActiveCompactions(1),
|
||||
),
|
||||
persist: true,
|
||||
persist_row_threshold: 10 * 1000 * 1000,
|
||||
..Default::default()
|
||||
}),
|
||||
worker_cleanup_avg_sleep: None,
|
||||
routing_rules: Some(RoutingRules::RoutingConfig(RoutingConfig {
|
||||
sink: Some(management::Sink {
|
||||
sink: Some(management::sink::Sink::Kafka(KafkaProducer {})),
|
||||
routing_rules: Some(database_rules::RoutingRules::RoutingConfig(RoutingConfig {
|
||||
sink: Some(Sink {
|
||||
sink: Some(sink::Sink::Kafka(KafkaProducer {})),
|
||||
}),
|
||||
})),
|
||||
write_buffer_connection: Some(WriteBufferConnection {
|
||||
|
@ -144,16 +144,10 @@ Examples:
|
|||
|
||||
// Write a few points
|
||||
let mut write_client = influxdb_iox_client::write::Client::new(writer_grpc_channel);
|
||||
let lp_lines = [
|
||||
"write_test,region=west user=23.2 100",
|
||||
"write_test,region=west user=21.0 150",
|
||||
"write_test,region=east bytes=99i 200",
|
||||
];
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
write_client
|
||||
.write_pb(test_write(&db_name))
|
||||
.await
|
||||
.expect("cannot write");
|
||||
assert_eq!(num_lines_written, 3);
|
||||
|
||||
// Create the reader db
|
||||
let reader_grpc_bind_addr = format!("http://{}", reader);
|
||||
|
@ -170,3 +164,62 @@ Examples:
|
|||
|
||||
println!("Created database {}", db_name);
|
||||
}
|
||||
|
||||
/// 3 rows of test data
|
||||
///
|
||||
/// "write_test,region=west user=23.2 100"
|
||||
// "write_test,region=west user=21.0 150"
|
||||
// "write_test,region=east bytes=99i 200"
|
||||
fn test_write(db_name: &str) -> WriteRequest {
|
||||
WriteRequest {
|
||||
database_batch: Some(DatabaseBatch {
|
||||
database_name: db_name.to_string(),
|
||||
table_batches: vec![TableBatch {
|
||||
table_name: "write_test".to_string(),
|
||||
columns: vec![
|
||||
Column {
|
||||
column_name: "time".to_string(),
|
||||
semantic_type: column::SemanticType::Time as _,
|
||||
values: Some(column::Values {
|
||||
i64_values: vec![100, 150, 200],
|
||||
..Default::default()
|
||||
}),
|
||||
null_mask: vec![],
|
||||
},
|
||||
Column {
|
||||
column_name: "region".to_string(),
|
||||
semantic_type: column::SemanticType::Tag as _,
|
||||
values: Some(column::Values {
|
||||
string_values: vec![
|
||||
"west".to_string(),
|
||||
"west".to_string(),
|
||||
"east".to_string(),
|
||||
],
|
||||
..Default::default()
|
||||
}),
|
||||
null_mask: vec![],
|
||||
},
|
||||
Column {
|
||||
column_name: "user".to_string(),
|
||||
semantic_type: column::SemanticType::Field as _,
|
||||
values: Some(column::Values {
|
||||
f64_values: vec![23.2, 21.0],
|
||||
..Default::default()
|
||||
}),
|
||||
null_mask: vec![0b00000100],
|
||||
},
|
||||
Column {
|
||||
column_name: "bytes".to_string(),
|
||||
semantic_type: column::SemanticType::Field as _,
|
||||
values: Some(column::Values {
|
||||
i64_values: vec![99],
|
||||
..Default::default()
|
||||
}),
|
||||
null_mask: vec![0b00000011],
|
||||
},
|
||||
],
|
||||
row_count: 3,
|
||||
}],
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue