Merge branch 'main' into cn/sort-key-catalog

pull/24376/head
kodiakhq[bot] 2022-04-04 16:54:48 +00:00 committed by GitHub
commit e2439c0a4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
53 changed files with 865 additions and 316 deletions

352
Cargo.lock generated
View File

@ -130,8 +130,8 @@ dependencies = [
"bytes",
"futures",
"proc-macro2",
"prost",
"prost-derive",
"prost 0.9.0",
"prost-derive 0.9.0",
"tokio",
"tonic",
"tonic-build",
@ -615,9 +615,9 @@ dependencies = [
[[package]]
name = "clap"
version = "3.1.7"
version = "3.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c67e7973e74896f4bba06ca2dcfd28d54f9cb8c035e940a32b88ed48f5f5ecf2"
checksum = "71c47df61d9e16dc010b55dba1952a57d8c215dbb533fd13cdd13369aac73b1c"
dependencies = [
"atty",
"bitflags",
@ -634,7 +634,7 @@ dependencies = [
name = "clap_blocks"
version = "0.1.0"
dependencies = [
"clap 3.1.7",
"clap 3.1.8",
"data_types",
"futures",
"iox_catalog",
@ -670,7 +670,7 @@ name = "client_util"
version = "0.1.0"
dependencies = [
"http",
"prost",
"prost 0.9.0",
"thiserror",
"tokio",
"tonic",
@ -1118,7 +1118,7 @@ dependencies = [
"pin-project-lite",
"rand",
"smallvec",
"sqlparser",
"sqlparser 0.15.0",
"tempfile",
"tokio",
"tokio-stream",
@ -1133,7 +1133,7 @@ dependencies = [
"arrow",
"ordered-float 2.10.0",
"parquet",
"sqlparser",
"sqlparser 0.15.0",
]
[[package]]
@ -1157,7 +1157,7 @@ dependencies = [
"ahash",
"arrow",
"datafusion-common",
"sqlparser",
"sqlparser 0.15.0",
]
[[package]]
@ -1683,11 +1683,11 @@ dependencies = [
"datafusion 0.1.0",
"num_cpus",
"observability_deps",
"pbjson",
"pbjson-build",
"pbjson 0.3.0",
"pbjson-build 0.3.0",
"pbjson-types",
"predicate",
"prost",
"prost 0.9.0",
"prost-build",
"regex",
"serde",
@ -1751,9 +1751,9 @@ dependencies = [
"grpc-router-test-gen",
"observability_deps",
"paste",
"prost",
"prost 0.9.0",
"prost-build",
"prost-types",
"prost-types 0.9.0",
"thiserror",
"tokio",
"tokio-stream",
@ -1767,9 +1767,9 @@ dependencies = [
name = "grpc-router-test-gen"
version = "0.1.0"
dependencies = [
"prost",
"prost 0.9.0",
"prost-build",
"prost-types",
"prost-types 0.9.0",
"tonic",
"tonic-build",
]
@ -2116,7 +2116,7 @@ dependencies = [
"byteorder",
"bytes",
"chrono",
"clap 3.1.7",
"clap 3.1.8",
"clap_blocks",
"comfy-table",
"compactor",
@ -2145,12 +2145,14 @@ dependencies = [
"iox_catalog",
"iox_object_store",
"iox_tests",
"ioxd",
"ioxd_common",
"ioxd_compactor",
"ioxd_database",
"ioxd_ingester",
"ioxd_querier",
"ioxd_router",
"ioxd_router2",
"ioxd_test",
"itertools",
"job_registry",
"libc",
@ -2175,7 +2177,7 @@ dependencies = [
"pprof 0.7.0",
"predicate",
"predicates",
"prost",
"prost 0.9.0",
"querier",
"query",
"rand",
@ -2231,7 +2233,7 @@ dependencies = [
"mutable_batch",
"mutable_batch_lp",
"mutable_batch_pb",
"prost",
"prost 0.9.0",
"rand",
"serde",
"serde_json",
@ -2260,7 +2262,7 @@ dependencies = [
"client_util",
"futures-util",
"generated_types",
"prost",
"prost 0.9.0",
"tonic",
"workspace-hack",
]
@ -2286,7 +2288,7 @@ version = "0.1.0"
dependencies = [
"generated_types",
"snafu",
"sqlparser",
"sqlparser 0.16.0",
"workspace-hack",
]
@ -2323,7 +2325,7 @@ dependencies = [
"parquet_file",
"pin-project",
"predicate",
"prost",
"prost 0.9.0",
"query",
"schema",
"snafu",
@ -2411,7 +2413,7 @@ version = "0.1.0"
dependencies = [
"chrono",
"chrono-english",
"clap 3.1.7",
"clap 3.1.8",
"criterion",
"data_types",
"futures",
@ -2440,7 +2442,7 @@ dependencies = [
"assert_matches",
"async-trait",
"chrono",
"clap 3.1.7",
"clap 3.1.8",
"dotenv",
"futures",
"glob",
@ -2449,8 +2451,8 @@ dependencies = [
"kube-derive",
"kube-runtime",
"parking_lot 0.11.2",
"pbjson-build",
"prost",
"pbjson-build 0.3.0",
"prost 0.9.0",
"schemars",
"serde",
"serde_json",
@ -2502,63 +2504,6 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "ioxd"
version = "0.1.0"
dependencies = [
"ansi_term",
"arrow-flight",
"arrow_util",
"assert_cmd",
"async-trait",
"clap 3.1.7",
"clap_blocks",
"compactor",
"data_types2",
"dml",
"futures",
"generated_types",
"hashbrown 0.12.0",
"http",
"hyper",
"ingester",
"iox_catalog",
"iox_tests",
"ioxd_common",
"metric",
"metric_exporters",
"mutable_batch",
"num_cpus",
"object_store",
"observability_deps",
"panic_logging",
"pprof 0.7.0",
"query",
"router",
"router2",
"server",
"service_common",
"service_grpc_flight",
"service_grpc_influxrpc",
"service_grpc_testing",
"snafu",
"tempfile",
"test_helpers",
"thiserror",
"time 0.1.0",
"tokio",
"tokio-stream",
"tokio-util 0.7.1",
"tonic",
"tonic-health",
"tonic-reflection",
"trace",
"trace_exporters",
"trace_http",
"workspace-hack",
"write_buffer",
]
[[package]]
name = "ioxd_common"
version = "0.1.0"
@ -2566,7 +2511,7 @@ dependencies = [
"async-trait",
"bytes",
"chrono",
"clap 3.1.7",
"clap 3.1.8",
"clap_blocks",
"data_types",
"dml",
@ -2582,7 +2527,7 @@ dependencies = [
"observability_deps",
"parking_lot 0.12.0",
"predicate",
"prost",
"prost 0.9.0",
"reqwest",
"serde",
"serde_json",
@ -2600,6 +2545,35 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "ioxd_compactor"
version = "0.1.0"
dependencies = [
"async-trait",
"clap_blocks",
"compactor",
"data_types2",
"generated_types",
"hyper",
"iox_catalog",
"ioxd_common",
"metric",
"object_store",
"query",
"service_grpc_testing",
"thiserror",
"time 0.1.0",
"tokio",
"tokio-stream",
"tokio-util 0.7.1",
"tonic",
"tonic-health",
"tonic-reflection",
"trace",
"trace_http",
"workspace-hack",
]
[[package]]
name = "ioxd_database"
version = "0.1.0"
@ -2609,7 +2583,7 @@ dependencies = [
"arrow_util",
"async-trait",
"bytes",
"clap 3.1.7",
"clap 3.1.8",
"clap_blocks",
"data_types",
"db",
@ -2626,7 +2600,7 @@ dependencies = [
"mutable_batch_pb",
"object_store",
"observability_deps",
"prost",
"prost 0.9.0",
"query",
"reqwest",
"schema",
@ -2654,6 +2628,36 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "ioxd_ingester"
version = "0.1.0"
dependencies = [
"async-trait",
"clap_blocks",
"data_types2",
"generated_types",
"hyper",
"ingester",
"iox_catalog",
"ioxd_common",
"metric",
"object_store",
"query",
"service_grpc_testing",
"thiserror",
"time 0.1.0",
"tokio",
"tokio-stream",
"tokio-util 0.7.1",
"tonic",
"tonic-health",
"tonic-reflection",
"trace",
"trace_http",
"workspace-hack",
"write_buffer",
]
[[package]]
name = "ioxd_querier"
version = "0.1.0"
@ -2747,6 +2751,30 @@ dependencies = [
"trace_http",
"workspace-hack",
"write_buffer",
"write_summary",
]
[[package]]
name = "ioxd_test"
version = "0.1.0"
dependencies = [
"async-trait",
"clap 3.1.8",
"generated_types",
"hyper",
"ioxd_common",
"metric",
"service_grpc_testing",
"snafu",
"tokio",
"tokio-stream",
"tokio-util 0.7.1",
"tonic",
"tonic-health",
"tonic-reflection",
"trace",
"trace_http",
"workspace-hack",
]
[[package]]
@ -3373,7 +3401,7 @@ dependencies = [
"mutable_batch",
"mutable_batch_lp",
"mutable_batch_pb",
"prost",
"prost 0.9.0",
]
[[package]]
@ -3831,7 +3859,7 @@ dependencies = [
"pbjson-types",
"persistence_windows",
"predicate",
"prost",
"prost 0.9.0",
"schema",
"snafu",
"tempfile",
@ -3868,7 +3896,7 @@ dependencies = [
"pbjson-types",
"persistence_windows",
"predicate",
"prost",
"prost 0.9.0",
"schema",
"snafu",
"tempfile",
@ -3898,6 +3926,16 @@ dependencies = [
"serde",
]
[[package]]
name = "pbjson"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d86c0a61b93c50f35af5d8a4f134790f47cbebf8803a7219dd1e7238cd1af022"
dependencies = [
"base64 0.13.0",
"serde",
]
[[package]]
name = "pbjson-build"
version = "0.2.3"
@ -3906,8 +3944,20 @@ checksum = "f7ded6959888ee91bc803eb467411e416181ce68c125955338c2ad7dfbfa610d"
dependencies = [
"heck 0.4.0",
"itertools",
"prost",
"prost-types",
"prost 0.9.0",
"prost-types 0.9.0",
]
[[package]]
name = "pbjson-build"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "956a449e8a85fc040e9f8cd8fd4dd2e68059d179092401f0d8570ba059f76dae"
dependencies = [
"heck 0.4.0",
"itertools",
"prost 0.10.0",
"prost-types 0.10.0",
]
[[package]]
@ -3918,9 +3968,9 @@ checksum = "58d94ffa4c36eb9d09fb8e0461f8256347d1e48793f53a8a210b43726f4ec884"
dependencies = [
"bytes",
"chrono",
"pbjson",
"pbjson-build",
"prost",
"pbjson 0.2.3",
"pbjson-build 0.2.3",
"prost 0.9.0",
"prost-build",
"serde",
]
@ -4104,9 +4154,9 @@ dependencies = [
"log",
"nix",
"parking_lot 0.11.2",
"prost",
"prost 0.9.0",
"prost-build",
"prost-derive",
"prost-derive 0.9.0",
"smallvec",
"symbolic-demangle",
"tempfile",
@ -4128,9 +4178,9 @@ dependencies = [
"nix",
"once_cell",
"parking_lot 0.12.0",
"prost",
"prost 0.9.0",
"prost-build",
"prost-derive",
"prost-derive 0.9.0",
"smallvec",
"symbolic-demangle",
"tempfile",
@ -4160,7 +4210,7 @@ dependencies = [
"schema",
"serde_json",
"snafu",
"sqlparser",
"sqlparser 0.16.0",
"test_helpers",
"tokio",
"workspace-hack",
@ -4198,9 +4248,9 @@ dependencies = [
[[package]]
name = "pretty_assertions"
version = "1.2.0"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c038cb5319b9c704bf9c227c261d275bfec0ad438118a2787ce47944fb228b"
checksum = "c89f989ac94207d048d92db058e4f6ec7342b0971fc58d1271ca148b799b3563"
dependencies = [
"ansi_term",
"ctor",
@ -4279,7 +4329,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001"
dependencies = [
"bytes",
"prost-derive",
"prost-derive 0.9.0",
]
[[package]]
name = "prost"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bd5316aa8f5c82add416dfbc25116b84b748a21153f512917e8143640a71bbd"
dependencies = [
"bytes",
"prost-derive 0.10.0",
]
[[package]]
@ -4295,8 +4355,8 @@ dependencies = [
"log",
"multimap",
"petgraph",
"prost",
"prost-types",
"prost 0.9.0",
"prost-types 0.9.0",
"regex",
"tempfile",
"which",
@ -4315,6 +4375,19 @@ dependencies = [
"syn",
]
[[package]]
name = "prost-derive"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df35198f0777b75e9ff669737c6da5136b59dba33cf5a010a6d1cc4d56defc6f"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-types"
version = "0.9.0"
@ -4322,7 +4395,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a"
dependencies = [
"bytes",
"prost",
"prost 0.9.0",
]
[[package]]
name = "prost-types"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "926681c118ae6e512a3ccefd4abbe5521a14f4cc1e207356d4d00c0b7f2006fd"
dependencies = [
"bytes",
"prost 0.10.0",
]
[[package]]
@ -4353,7 +4436,7 @@ dependencies = [
"pin-project",
"predicate",
"proptest",
"prost",
"prost 0.9.0",
"query",
"rand",
"schema",
@ -4755,6 +4838,7 @@ dependencies = [
"trace",
"workspace-hack",
"write_buffer",
"write_summary",
]
[[package]]
@ -5347,7 +5431,7 @@ dependencies = [
"panic_logging",
"parking_lot 0.12.0",
"predicate",
"prost",
"prost 0.9.0",
"query",
"regex",
"schema",
@ -5556,6 +5640,15 @@ dependencies = [
"log",
]
[[package]]
name = "sqlparser"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e9a527b68048eb95495a1508f6c8395c8defcff5ecdbe8ad4106d08a2ef2a3c"
dependencies = [
"log",
]
[[package]]
name = "sqlx"
version = "0.5.11"
@ -6108,8 +6201,8 @@ dependencies = [
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"prost-derive",
"prost 0.9.0",
"prost-derive 0.9.0",
"tokio",
"tokio-stream",
"tokio-util 0.6.9",
@ -6140,7 +6233,7 @@ checksum = "7ae388bee1d4e52c9dc334f0d5918757b07b3ffafafd7953d254c7a0e8605e02"
dependencies = [
"async-stream",
"bytes",
"prost",
"prost 0.9.0",
"tokio",
"tokio-stream",
"tonic",
@ -6154,8 +6247,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "228cc5aa5d3e6e0624b5f756a7558038ee86428d1d58d8c6e551b389b12cf355"
dependencies = [
"bytes",
"prost",
"prost-types",
"prost 0.9.0",
"prost-types 0.9.0",
"tokio",
"tokio-stream",
"tonic",
@ -6231,7 +6324,7 @@ version = "0.1.0"
dependencies = [
"async-trait",
"chrono",
"clap 3.1.7",
"clap 3.1.8",
"futures",
"observability_deps",
"snafu",
@ -6286,9 +6379,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.23"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa31669fa42c09c34d94d8165dd2012e8ff3c66aca50f3bb226b68f216f2706c"
checksum = "90442985ee2f57c9e1b548ee72ae842f4a9a20e3f417cc38dbc5dc684d9bb4ee"
dependencies = [
"lazy_static",
"valuable",
@ -6327,9 +6420,9 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.3.9"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e0ab7bdc962035a87fba73f3acca9b8a8d0034c2e6f60b84aeaaddddc155dce"
checksum = "b9df98b037d039d03400d9dd06b0f8ce05486b5f25e9a2d7d36196e142ebbc52"
dependencies = [
"ansi_term",
"lazy_static",
@ -6376,7 +6469,7 @@ dependencies = [
name = "trogging"
version = "0.1.0"
dependencies = [
"clap 3.1.7",
"clap 3.1.8",
"logfmt",
"observability_deps",
"regex",
@ -6834,8 +6927,8 @@ dependencies = [
"once_cell",
"parquet",
"predicates",
"prost",
"prost-types",
"prost 0.9.0",
"prost-types 0.9.0",
"rand",
"regex",
"regex-automata",
@ -6887,7 +6980,7 @@ dependencies = [
"observability_deps",
"parking_lot 0.12.0",
"pin-project",
"prost",
"prost 0.9.0",
"rskafka",
"schema",
"tempfile",
@ -6901,6 +6994,19 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "write_summary"
version = "0.1.0"
dependencies = [
"base64 0.13.0",
"data_types",
"dml",
"generated_types",
"serde_json",
"time 0.1.0",
"workspace-hack",
]
[[package]]
name = "xml-rs"
version = "0.8.4"

View File

@ -30,12 +30,14 @@ members = [
"iox_gitops_adapter",
"iox_object_store",
"iox_tests",
"ioxd",
"ioxd_common",
"ioxd_compactor",
"ioxd_ingester",
"ioxd_querier",
"ioxd_database",
"ioxd_router",
"ioxd_router2",
"ioxd_test",
"job_registry",
"lifecycle",
"logfmt",
@ -78,6 +80,7 @@ members = [
"trogging",
"workspace-hack",
"write_buffer",
"write_summary",
]
default-members = ["influxdb_iox"]

View File

@ -18,7 +18,7 @@ use iox_catalog::interface::{Catalog, Transaction};
use iox_object_store::ParquetFilePath;
use metric::{Attributes, Metric, U64Counter, U64Gauge, U64Histogram, U64HistogramOptions};
use object_store::DynObjectStore;
use observability_deps::tracing::warn;
use observability_deps::tracing::{info, warn};
use parquet_file::metadata::{IoxMetadata, IoxParquetMetaData};
use query::{
compute_sort_key_for_chunks, exec::ExecutorType, frontend::reorg::ReorgPlanner,
@ -363,6 +363,7 @@ impl Compactor {
partition_id: PartitionId,
compaction_max_size_bytes: i64,
) -> Result<()> {
info!("compacting partition {}", partition_id);
let start_time = self.time_provider.now();
let parquet_files = self
@ -388,6 +389,7 @@ impl Compactor {
// Attach appropriate tombstones to each file
let groups_with_tombstones = self.add_tombstones_to_groups(compact_file_groups).await?;
info!("compacting {} groups", groups_with_tombstones.len());
// Compact, persist,and update catalog accordingly for each overlaped file
let mut tombstones = BTreeMap::new();
@ -407,6 +409,7 @@ impl Compactor {
// deleted. These should already be unique, no need to dedupe.
let original_parquet_file_ids: Vec<_> =
group.parquet_files.iter().map(|f| f.data.id).collect();
info!("compacting group of files: {:?}", original_parquet_file_ids);
// compact
let split_compacted_files = self.compact(group.parquet_files).await?;

View File

@ -172,9 +172,15 @@ async fn run_compactor(compactor: Arc<Compactor>, shutdown: CancellationToken) {
let compactor = Arc::clone(&compactor);
let partition_id = c.partition_id;
let handle = tokio::task::spawn(async move {
compactor
if let Err(e) = compactor
.compact_partition(partition_id, max_file_size)
.await
{
warn!(
"compaction on partition {} failed with: {:?}",
partition_id, e
);
}
});
used_size += c.file_size_bytes;
handles.push(handle);
@ -185,9 +191,7 @@ async fn run_compactor(compactor: Arc<Compactor>, shutdown: CancellationToken) {
let compactions_run = handles.len();
if let Err(e) = futures::future::try_join_all(handles).await {
warn!("error compacting: {}", e);
}
let _ = futures::future::join_all(handles).await;
// if all candidate partitions have been compacted, wait a bit before checking again
if compactions_run == candidates.len() {

View File

@ -819,3 +819,15 @@ impl IngesterQueryRequest {
}
}
}
/// The information on what sequence numbers were stored for a
/// particular (line protocol) write that may have been sharded /
/// partitioned across multiple sequencers
///
/// This information can be used to wait for a particular write to
/// become readable.
#[derive(Debug, Clone)]
pub struct SequencerWrites {
/// List of sequences
pub sequencers: Vec<Sequence>,
}

View File

@ -14,7 +14,7 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Converts a [`predicate::predicate::Predicate`] into [`read_buffer::Predicate`],
/// Converts a [`predicate::Predicate`] into [`read_buffer::Predicate`],
/// suitable for evaluating on the ReadBuffer.
///
/// NOTE: a valid Read Buffer predicate is not guaranteed to be applicable to an

View File

@ -10,7 +10,7 @@ data_types = { path = "../data_types", optional = true }
data_types2 = { path = "../data_types2", optional = true }
datafusion = { path = "../datafusion", optional = true }
observability_deps = { path = "../observability_deps" }
pbjson = "0.2"
pbjson = "0.3"
pbjson-types = "0.2"
predicate = { path = "../predicate", optional = true }
prost = "0.9"
@ -27,7 +27,7 @@ num_cpus = "1.13.0"
[build-dependencies] # In alphabetical order
tonic-build = "0.6"
prost-build = "0.9"
pbjson-build = "0.2"
pbjson-build = "0.3"
[features]
default = ["data_types_conversions"]

View File

@ -43,6 +43,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
let schema_path = root.join("influxdata/iox/schema/v1");
let storage_path = root.join("influxdata/platform/storage");
let write_buffer_path = root.join("influxdata/iox/write_buffer/v1");
let write_summary_path = root.join("influxdata/iox/write_summary/v1");
let proto_files = vec![
delete_path.join("service.proto"),
@ -77,6 +78,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
storage_path.join("storage_common.proto"),
storage_path.join("test.proto"),
write_buffer_path.join("write_buffer.proto"),
write_summary_path.join("write_summary.proto"),
];
// Tell cargo to recompile if any of these proto files are changed

View File

@ -0,0 +1,18 @@
syntax = "proto3";
package influxdata.iox.write_summary.v1;
// Represents a single logical write that was partitioned and shared
// into multiple pieces in multiple sequencers
message WriteSummary {
// per sequencer information
repeated SequencerWrite sequencers = 1;
}
// Per sequencer information aout what sequence numbers contain part of a write
message SequencerWrite {
// Unique sequencer ID.
uint32 sequencer_id = 1;
// Which sequence numbers for this sequencer had data
repeated uint64 sequence_numbers = 13;
}

View File

@ -156,6 +156,19 @@ pub mod influxdata {
));
}
}
pub mod write_summary {
pub mod v1 {
include!(concat!(
env!("OUT_DIR"),
"/influxdata.iox.write_summary.v1.rs"
));
include!(concat!(
env!("OUT_DIR"),
"/influxdata.iox.write_summary.v1.serde.rs"
));
}
}
}
pub mod pbdata {

View File

@ -24,11 +24,13 @@ influxrpc_parser = { path = "../influxrpc_parser"}
iox_catalog = { path = "../iox_catalog" }
iox_object_store = { path = "../iox_object_store" }
ioxd_common = { path = "../ioxd_common"}
ioxd_compactor = { path = "../ioxd_compactor"}
ioxd_database = { path = "../ioxd_database"}
ioxd_ingester = { path = "../ioxd_ingester"}
ioxd_router = { path = "../ioxd_router"}
ioxd_router2 = { path = "../ioxd_router2"}
ioxd_querier = { path = "../ioxd_querier"}
ioxd = { path = "../ioxd", default-features = false }
ioxd_test = { path = "../ioxd_test"}
job_registry = { path = "../job_registry" }
logfmt = { path = "../logfmt" }
metric = { path = "../metric" }

View File

@ -10,15 +10,11 @@ use clap_blocks::{
write_buffer::WriteBufferConfig,
};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use ioxd_common::Service;
use ioxd_compactor::create_compactor_server_type;
use ioxd_ingester::create_ingester_server_type;
use ioxd_querier::create_querier_server_type;
use ioxd_router2::create_router2_server_type;
use ioxd::{
self,
server_type::{compactor::create_compactor_server_type, ingester::create_ingester_server_type},
Service,
};
use object_store::{DynObjectStore, ObjectStoreImpl};
use observability_deps::tracing::*;
use query::exec::Executor;
@ -60,10 +56,10 @@ pub enum Error {
Router2(#[from] ioxd_router2::Error),
#[error("Ingester error: {0}")]
Ingester(#[from] ioxd::server_type::ingester::Error),
Ingester(#[from] ioxd_ingester::Error),
#[error("error initializing compactor: {0}")]
Compactor(#[from] ioxd::server_type::compactor::Error),
Compactor(#[from] ioxd_compactor::Error),
#[error("Invalid config: {0}")]
InvalidConfig(#[from] CommonServerStateError),

View File

@ -10,8 +10,9 @@ use time::SystemProvider;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig, compactor::CompactorConfig, run_config::RunConfig,
};
use ioxd::{self, server_type::compactor::create_compactor_server_type, Service};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use ioxd_common::Service;
use ioxd_compactor::create_compactor_server_type;
use super::main;
@ -33,7 +34,7 @@ pub enum Error {
ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError),
#[error("error initializing compactor: {0}")]
Compactor(#[from] ioxd::server_type::compactor::Error),
Compactor(#[from] ioxd_compactor::Error),
}
#[derive(Debug, clap::Parser)]

View File

@ -4,8 +4,8 @@ use std::sync::Arc;
use clap_blocks::run_config::RunConfig;
use data_types::boolean_flag::BooleanFlag;
use ioxd::{self, Service};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use ioxd_common::Service;
use ioxd_database::{
setup::{make_application, make_server},
DatabaseServerType,

View File

@ -4,8 +4,9 @@ use clap_blocks::{
catalog_dsn::CatalogDsnConfig, ingester::IngesterConfig, run_config::RunConfig,
write_buffer::WriteBufferConfig,
};
use ioxd::{self, server_type::ingester::create_ingester_server_type, Service};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use ioxd_common::Service;
use ioxd_ingester::create_ingester_server_type;
use object_store::{instrumentation::ObjectStoreMetrics, DynObjectStore, ObjectStoreImpl};
use observability_deps::tracing::*;
use query::exec::Executor;
@ -26,7 +27,7 @@ pub enum Error {
ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError),
#[error("error initializing ingester: {0}")]
Ingester(#[from] ioxd::server_type::ingester::Error),
Ingester(#[from] ioxd_ingester::Error),
#[error("Catalog DSN error: {0}")]
CatalogDsn(#[from] clap_blocks::catalog_dsn::Error),

View File

@ -1,4 +1,4 @@
use ioxd::Service;
use ioxd_common::Service;
use ioxd_common::{grpc_listener, http_listener, serve, server_type::CommonServerState};
use observability_deps::tracing::{error, info};
use panic_logging::SendPanicsToTracing;

View File

@ -8,8 +8,8 @@ use thiserror::Error;
use time::SystemProvider;
use clap_blocks::{catalog_dsn::CatalogDsnConfig, run_config::RunConfig};
use ioxd::{self, Service};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use ioxd_common::Service;
use ioxd_querier::create_querier_server_type;
use super::main;

View File

@ -7,8 +7,8 @@ use clap_blocks::run_config::RunConfig;
use data_types::router::Router as RouterConfig;
use generated_types::{google::FieldViolation, influxdata::iox::router::v1::RouterConfigFile};
use ioxd::{self, Service};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use ioxd_common::Service;
use ioxd_router::RouterServerType;
use observability_deps::tracing::warn;
use router::{resolver::RemoteTemplate, server::RouterServer};

View File

@ -6,8 +6,8 @@ use clap_blocks::{
catalog_dsn::CatalogDsnConfig, run_config::RunConfig, write_buffer::WriteBufferConfig,
};
use ioxd::{self, Service};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use ioxd_common::Service;
use ioxd_router2::create_router2_server_type;
use observability_deps::tracing::*;
use thiserror::Error;

View File

@ -3,12 +3,9 @@
use std::sync::Arc;
use clap_blocks::run_config::RunConfig;
use ioxd::{
self,
server_type::test::{TestAction, TestServerType},
Service,
};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use ioxd_common::Service;
use ioxd_test::{TestAction, TestServerType};
use metric::Registry;
use thiserror::Error;

View File

@ -1,8 +1,8 @@
use arrow_util::assert_batches_sorted_eq;
use http::StatusCode;
use test_helpers_end_to_end_ng::{
maybe_skip_integration, query_until_results, rand_name, write_to_router, ServerFixture,
TestConfig,
get_write_token, maybe_skip_integration, query_when_readable, rand_name, write_to_router,
ServerFixture, TestConfig,
};
#[tokio::test]
@ -26,12 +26,14 @@ async fn smoke() {
let response = write_to_router(lp, org, bucket, all_in_one.server().router_http_base()).await;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
let write_token = get_write_token(&response);
// run query in a loop until the data becomes available
let sql = format!("select * from {}", table_name);
let batches = query_until_results(
let batches = query_when_readable(
sql,
namespace,
write_token,
all_in_one.server().querier_grpc_connection(),
)
.await;

View File

@ -1,7 +1,7 @@
use arrow_util::assert_batches_sorted_eq;
use http::StatusCode;
use test_helpers_end_to_end_ng::{
maybe_skip_integration, query_until_results, MiniCluster, TestConfig,
get_write_token, maybe_skip_integration, query_when_readable, MiniCluster, TestConfig,
};
#[tokio::test]
@ -28,11 +28,16 @@ async fn basic_on_parquet() {
let response = cluster.write_to_router(lp).await;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
// assert that the response contains a write token
let write_token = get_write_token(&response);
assert!(!write_token.is_empty());
// run query in a loop until the data becomes available
let sql = format!("select * from {}", table_name);
let batches = query_until_results(
let batches = query_when_readable(
sql,
cluster.namespace(),
write_token,
cluster.querier().querier_grpc_connection(),
)
.await;

View File

@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021"
[dependencies]
sqlparser = "0.15.0"
sqlparser = "0.16.0"
snafu = "0.7.0"
generated_types = { path = "../generated_types" }

View File

@ -24,7 +24,7 @@ workspace-hack = { path = "../workspace-hack"}
dotenv = "0.15.0"
mutable_batch_lp = { path = "../mutable_batch_lp" }
paste = "1.0.7"
pretty_assertions = "1.2.0"
pretty_assertions = "1.2.1"
rand = "0.8"
tempfile = "3"
test_helpers = { path = "../test_helpers" }

View File

@ -0,0 +1,2 @@
-- Avoid seqscan when filtering columns by their table ID.
CREATE INDEX IF NOT EXISTS column_name_table_idx ON column_name (table_id);

View File

@ -36,7 +36,7 @@ trogging = { path = "../trogging", default-features = false, features = ["clap"]
[build-dependencies]
glob = "0.3.0"
pbjson-build = "0.2"
pbjson-build = "0.3"
tonic-build = "0.6"
[dev-dependencies]

View File

@ -1,66 +0,0 @@
[package]
name = "ioxd"
version = "0.1.0"
edition = "2021"
[dependencies]
# Workspace dependencies, in alphabetical order
clap_blocks = { path = "../clap_blocks" }
compactor = { path = "../compactor" }
data_types2 = { path = "../data_types2" }
dml = { path = "../dml" }
generated_types = { path = "../generated_types" }
ingester = { path = "../ingester" }
iox_catalog = { path = "../iox_catalog" }
ioxd_common = { path = "../ioxd_common" }
metric = { path = "../metric" }
metric_exporters = { path = "../metric_exporters" }
mutable_batch = { path = "../mutable_batch" }
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
panic_logging = { path = "../panic_logging" }
query = { path = "../query" }
router = { path = "../router" }
router2 = { path = "../router2" }
server = { path = "../server" }
service_common = { path = "../service_common" }
service_grpc_flight = { path = "../service_grpc_flight" }
service_grpc_influxrpc = { path = "../service_grpc_influxrpc" }
service_grpc_testing = { path = "../service_grpc_testing" }
time = { path = "../time" }
trace = { path = "../trace" }
trace_exporters = { path = "../trace_exporters" }
trace_http = { path = "../trace_http" }
write_buffer = { path = "../write_buffer" }
# Crates.io dependencies, in alphabetical order
ansi_term = "0.12"
arrow-flight = "11"
async-trait = "0.1"
clap = { version = "3", features = ["derive", "env"] }
futures = "0.3"
hashbrown = "0.12"
http = "0.2.0"
hyper = "0.14"
num_cpus = "1.13.0"
pprof = { version = "0.7", default-features = false, features = ["flamegraph", "prost-codec"], optional = true }
snafu = "0.7"
thiserror = "1.0.30"
tokio = { version = "1.17", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7.1" }
tonic = "0.6"
tonic-health = "0.5.0"
tonic-reflection = "0.3.0"
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
# Workspace dependencies, in alphabetical order
arrow_util = { path = "../arrow_util" }
test_helpers = { path = "../test_helpers" }
iox_tests = { path = "../iox_tests" }
# Crates.io dependencies, in alphabetical order
assert_cmd = "2.0.2"
tempfile = "3.1.0"

View File

@ -1,3 +0,0 @@
pub mod compactor;
pub mod ingester;
pub mod test;

View File

@ -1,6 +1,9 @@
pub mod http;
pub mod rpc;
pub mod server_type;
mod service;
pub use service::Service;
use crate::server_type::{CommonServerState, ServerType};
use futures::{future::FusedFuture, pin_mut, FutureExt};

View File

@ -1,9 +1,8 @@
use std::sync::Arc;
use clap_blocks::{run_config::RunConfig, socket_addr::SocketAddr};
use ioxd_common::server_type::ServerType;
pub mod server_type;
use crate::server_type::ServerType;
/// A service that will start on the specified addresses
pub struct Service {

34
ioxd_compactor/Cargo.toml Normal file
View File

@ -0,0 +1,34 @@
[package]
name = "ioxd_compactor"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# Workspace dependencies, in alphabetical order
clap_blocks = { path = "../clap_blocks" }
compactor = { path = "../compactor" }
data_types2 = { path = "../data_types2" }
generated_types = { path = "../generated_types" }
iox_catalog = { path = "../iox_catalog" }
ioxd_common = { path = "../ioxd_common" }
metric = { path = "../metric" }
query = { path = "../query" }
object_store = { path = "../object_store" }
service_grpc_testing = { path = "../service_grpc_testing" }
time = { path = "../time" }
trace = { path = "../trace" }
trace_http = { path = "../trace_http" }
# Crates.io dependencies, in alphabetical order
async-trait = "0.1"
hyper = "0.14"
thiserror = "1.0.30"
tokio = { version = "1.17", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7.1" }
tonic = "0.6"
tonic-health = "0.5.0"
tonic-reflection = "0.3.0"
workspace-hack = { path = "../workspace-hack"}

35
ioxd_ingester/Cargo.toml Normal file
View File

@ -0,0 +1,35 @@
[package]
name = "ioxd_ingester"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# Workspace dependencies, in alphabetical order
clap_blocks = { path = "../clap_blocks" }
data_types2 = { path = "../data_types2" }
generated_types = { path = "../generated_types" }
ingester = { path = "../ingester" }
iox_catalog = { path = "../iox_catalog" }
ioxd_common = { path = "../ioxd_common" }
metric = { path = "../metric" }
object_store = { path = "../object_store" }
query = { path = "../query" }
service_grpc_testing = { path = "../service_grpc_testing" }
time = { path = "../time" }
trace = { path = "../trace" }
trace_http = { path = "../trace_http" }
write_buffer = { path = "../write_buffer" }
# Crates.io dependencies, in alphabetical order
async-trait = "0.1"
hyper = "0.14"
thiserror = "1.0.30"
tokio = { version = "1.17", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7.1" }
tonic = "0.6"
tonic-health = "0.5.0"
tonic-reflection = "0.3.0"
workspace-hack = { path = "../workspace-hack"}

View File

@ -22,6 +22,7 @@ time = { path = "../time" }
trace = { path = "../trace" }
trace_http = { path = "../trace_http" }
write_buffer = { path = "../write_buffer" }
write_summary = { path = "../write_summary" }
# Crates.io dependencies, in alphabetical order
arrow-flight = "11"

View File

@ -17,16 +17,17 @@ use router2::{
dml_handlers::{
DmlHandler, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator,
NamespaceAutocreation, Partitioner, SchemaValidator, ShardedWriteBuffer,
WriteSummaryAdapter,
},
namespace_cache::{metrics::InstrumentedCache, MemoryNamespaceCache, ShardedCache},
sequencer::Sequencer,
server::{grpc::GrpcDelegate, http::HttpDelegate, RouterServer},
sharder::JumpHash,
};
use thiserror::Error;
use tokio_util::sync::CancellationToken;
use trace::TraceCollector;
use thiserror::Error;
use write_summary::WriteSummary;
use ioxd_common::{
add_service,
@ -71,7 +72,7 @@ impl<D> RouterServerType<D> {
#[async_trait]
impl<D> ServerType for RouterServerType<D>
where
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>> + 'static,
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = WriteSummary> + 'static,
{
/// Return the [`metric::Registry`] used by the router.
fn metric_registry(&self) -> Arc<Registry> {
@ -233,6 +234,8 @@ pub async fn create_router2_server_type(
//
////////////////////////////////////////////////////////////////////////////
let parallel_write = WriteSummaryAdapter::new(FanOutAdaptor::new(write_buffer));
// Build the chain of DML handlers that forms the request processing
// pipeline, starting with the namespace creator (for testing purposes) and
// write partitioner that yields a set of partitioned batches.
@ -248,7 +251,7 @@ pub async fn create_router2_server_type(
.and_then(InstrumentationDecorator::new(
"parallel_write",
&*metrics,
FanOutAdaptor::new(write_buffer),
parallel_write,
));
// Record the overall request handling latency

29
ioxd_test/Cargo.toml Normal file
View File

@ -0,0 +1,29 @@
[package]
name = "ioxd_test"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# Workspace dependencies, in alphabetical order
generated_types = { path = "../generated_types" }
ioxd_common = { path = "../ioxd_common" }
metric = { path = "../metric" }
service_grpc_testing = { path = "../service_grpc_testing" }
trace = { path = "../trace" }
trace_http = { path = "../trace_http" }
# Crates.io dependencies, in alphabetical order
async-trait = "0.1"
clap = { version = "3", features = ["derive", "env"] }
hyper = "0.14"
snafu = "0.7"
tokio = { version = "1.17", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7.1" }
tonic = "0.6"
tonic-health = "0.5.0"
tonic-reflection = "0.3.0"
workspace-hack = { path = "../workspace-hack"}

View File

@ -17,7 +17,7 @@ regex-syntax = "0.6.25"
schema = { path = "../schema" }
serde_json = "1.0.79"
snafu = "0.7"
sqlparser = "0.15.0"
sqlparser = "0.16.0"
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]

View File

@ -35,13 +35,14 @@ tonic = "0.6"
trace = { path = "../trace/" }
workspace-hack = { path = "../workspace-hack"}
write_buffer = { path = "../write_buffer" }
write_summary = { path = "../write_summary" }
[dev-dependencies]
assert_matches = "1.5"
criterion = { version = "0.3.4", features = ["async_tokio", "html_reports"] }
lazy_static = "1.4.0"
paste = "1.0.7"
pretty_assertions = "1.2.0"
pretty_assertions = "1.2.1"
rand = "0.8.3"
schema = { path = "../schema" }

View File

@ -7,6 +7,7 @@ use iox_catalog::{interface::Catalog, mem::MemCatalog};
use router2::{
dml_handlers::{
DmlHandlerChainExt, FanOutAdaptor, Partitioner, SchemaValidator, ShardedWriteBuffer,
WriteSummaryAdapter,
},
namespace_cache::{MemoryNamespaceCache, ShardedCache},
sequencer::Sequencer,
@ -66,8 +67,9 @@ fn e2e_benchmarks(c: &mut Criterion) {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
});
let handler_stack =
schema_validator.and_then(partitioner.and_then(FanOutAdaptor::new(write_buffer)));
let handler_stack = schema_validator.and_then(
partitioner.and_then(WriteSummaryAdapter::new(FanOutAdaptor::new(write_buffer))),
);
HttpDelegate::new(1024, Arc::new(handler_stack), &metrics)
};

View File

@ -2,7 +2,7 @@ use super::DmlHandler;
use async_trait::async_trait;
use data_types2::{DatabaseName, DeletePredicate};
use futures::{stream::FuturesUnordered, TryStreamExt};
use std::{fmt::Debug, future, marker::PhantomData};
use std::{fmt::Debug, marker::PhantomData};
use trace::ctx::SpanContext;
/// A [`FanOutAdaptor`] takes an iterator of DML write operation inputs and
@ -38,7 +38,7 @@ where
U: Iterator<Item = T::WriteInput> + Send + Sync,
{
type WriteInput = I;
type WriteOutput = ();
type WriteOutput = Vec<T::WriteOutput>;
type WriteError = T::WriteError;
type DeleteError = T::DeleteError;
@ -51,7 +51,7 @@ where
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
input
let results = input
.into_iter()
.map(|v| {
let namespace = namespace.clone();
@ -59,10 +59,9 @@ where
async move { self.inner.write(&namespace, v, span_ctx).await }
})
.collect::<FuturesUnordered<_>>()
.try_for_each(|_| future::ready(Ok(())))
.try_collect::<Vec<_>>()
.await?;
Ok(())
Ok(results)
}
/// Pass the delete through to the inner handler.

View File

@ -165,6 +165,7 @@ mod tests {
use metric::Attributes;
use std::sync::Arc;
use trace::{span::SpanStatus, RingBufferTraceCollector, TraceCollector};
use write_summary::WriteSummary;
const HANDLER_NAME: &str = "bananas";
@ -205,10 +206,14 @@ mod tests {
);
}
fn summary() -> WriteSummary {
WriteSummary::default()
}
#[tokio::test]
async fn test_write_ok() {
let ns = "platanos".try_into().unwrap();
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())]));
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())]));
let metrics = Arc::new(metric::Registry::default());
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));

View File

@ -6,6 +6,7 @@ use async_trait::async_trait;
use data_types2::{DatabaseName, DeletePredicate};
use parking_lot::Mutex;
use trace::ctx::SpanContext;
use write_summary::WriteSummary;
/// A captured call to a [`MockDmlHandler`], generic over `W`, the captured
/// [`DmlHandler::WriteInput`] type.
@ -25,7 +26,7 @@ pub enum MockDmlHandlerCall<W> {
#[derive(Debug)]
struct Inner<W> {
calls: Vec<MockDmlHandlerCall<W>>,
write_return: VecDeque<Result<(), DmlError>>,
write_return: VecDeque<Result<WriteSummary, DmlError>>,
delete_return: VecDeque<Result<(), DmlError>>,
}
@ -58,7 +59,10 @@ impl<W> MockDmlHandler<W>
where
W: Clone,
{
pub fn with_write_return(self, ret: impl Into<VecDeque<Result<(), DmlError>>>) -> Self {
pub fn with_write_return(
self,
ret: impl Into<VecDeque<Result<WriteSummary, DmlError>>>,
) -> Self {
self.0.lock().write_return = ret.into();
self
}
@ -93,7 +97,7 @@ where
type WriteError = DmlError;
type DeleteError = DmlError;
type WriteInput = W;
type WriteOutput = ();
type WriteOutput = WriteSummary;
async fn write(
&self,

View File

@ -99,5 +99,8 @@ pub use chain::*;
mod fan_out;
pub use fan_out::*;
mod write_summary;
pub use self::write_summary::*;
#[cfg(test)]
pub mod mock;

View File

@ -11,7 +11,6 @@ use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
use std::{
fmt::{Debug, Display},
future,
sync::Arc,
};
use thiserror::Error;
@ -81,7 +80,7 @@ where
type DeleteError = ShardError;
type WriteInput = Partitioned<HashMap<String, MutableBatch>>;
type WriteOutput = ();
type WriteOutput = Vec<DmlMeta>;
/// Shard `writes` and dispatch the resultant DML operations.
async fn write(
@ -162,30 +161,37 @@ where
/// Enumerates all items in the iterator, maps each to a future that dispatches
/// the [`DmlOperation`] to its paired [`Sequencer`], executes all the futures
/// in parallel and gathers any errors.
async fn parallel_enqueue<T>(v: T) -> Result<(), ShardError>
///
/// Returns a list of the sequences that were written
async fn parallel_enqueue<T>(v: T) -> Result<Vec<DmlMeta>, ShardError>
where
T: Iterator<Item = (Arc<Sequencer>, DmlOperation)> + Send,
{
let mut successes = 0;
let errs = v
.map(|(sequencer, op)| async move {
let mut successes = vec![];
let mut errs = vec![];
v.map(|(sequencer, op)| async move {
tokio::spawn(async move { sequencer.enqueue(op).await })
.await
.expect("shard enqueue panic")
})
// Use FuturesUnordered so the futures can run in parallel
.collect::<FuturesUnordered<_>>()
.filter_map(|v| {
if v.is_ok() {
successes += 1;
}
future::ready(v.err())
})
.collect::<Vec<WriteBufferError>>()
.await;
.collect::<Vec<_>>()
.await
// Sort the result into successes/failures upon completion
.into_iter()
.for_each(|v| match v {
Ok(meta) => successes.push(meta),
Err(e) => errs.push(e),
});
match errs.len() {
0 => Ok(()),
_n => Err(ShardError::WriteBufferErrors { successes, errs }),
0 => Ok(successes),
_n => Err(ShardError::WriteBufferErrors {
successes: successes.len(),
errs,
}),
}
}

View File

@ -0,0 +1,60 @@
use super::DmlHandler;
use async_trait::async_trait;
use data_types2::{DatabaseName, DeletePredicate};
use dml::DmlMeta;
use std::fmt::Debug;
use trace::ctx::SpanContext;
use write_summary::WriteSummary;
/// A [`WriteSummaryAdapter`] wraps DML Handler that produces
/// `Vec<Vec<DmlMeta>>` for each write, and produces a WriteSummary,
/// suitable for
/// sending back to a client
#[derive(Debug, Default)]
pub struct WriteSummaryAdapter<T> {
inner: T,
}
impl<T> WriteSummaryAdapter<T> {
/// Construct a [`WriteSummaryAdapter`] that passes DML operations to `inner`
/// concurrently.
pub fn new(inner: T) -> Self {
Self { inner }
}
}
#[async_trait]
impl<T> DmlHandler for WriteSummaryAdapter<T>
where
T: DmlHandler<WriteOutput = Vec<Vec<DmlMeta>>>,
{
type WriteInput = T::WriteInput;
type WriteOutput = WriteSummary;
type WriteError = T::WriteError;
type DeleteError = T::DeleteError;
/// Sends `input` to the inner handler, which returns a
/// `Vec<Vec<DmlMeta>>`, creating a `WriteSummary`
async fn write(
&self,
namespace: &DatabaseName<'static>,
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
let metas = self.inner.write(namespace, input, span_ctx).await?;
Ok(WriteSummary::new(metas))
}
/// Pass the delete through to the inner handler.
async fn delete(
&self,
namespace: &DatabaseName<'static>,
table_name: &str,
predicate: &DeletePredicate,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError> {
self.inner
.delete(namespace, table_name, predicate, span_ctx)
.await
}
}

View File

@ -15,6 +15,7 @@ use schema::selection::Selection;
use std::ops::DerefMut;
use tonic::{Request, Response, Status};
use trace::ctx::SpanContext;
use write_summary::WriteSummary;
use crate::dml_handlers::{DmlError, DmlHandler, PartitionError};
@ -45,7 +46,7 @@ impl<D> GrpcDelegate<D> {
impl<D> GrpcDelegate<D>
where
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>> + 'static,
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = WriteSummary> + 'static,
{
/// Acquire a [`WriteService`] gRPC service implementation.
///
@ -159,6 +160,9 @@ where
"routing grpc write",
);
// TODO return the produced WriteSummary to the client
// https://github.com/influxdata/influxdb_iox/issues/4208
self.dml_handler
.write(&namespace, tables, span_ctx)
.await
@ -265,10 +269,14 @@ mod tests {
use super::*;
fn summary() -> WriteSummary {
WriteSummary::default()
}
#[tokio::test]
async fn test_write_no_batch() {
let metrics = Arc::new(metric::Registry::default());
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())]));
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())]));
let grpc = super::WriteService::new(Arc::clone(&handler), &metrics);
let req = WriteRequest::default();
@ -285,7 +293,7 @@ mod tests {
#[tokio::test]
async fn test_write_no_namespace() {
let metrics = Arc::new(metric::Registry::default());
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())]));
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())]));
let grpc = super::WriteService::new(Arc::clone(&handler), &metrics);
let req = WriteRequest {
@ -307,7 +315,7 @@ mod tests {
#[tokio::test]
async fn test_write_ok() {
let metrics = Arc::new(metric::Registry::default());
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())]));
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())]));
let grpc = super::WriteService::new(Arc::clone(&handler), &metrics);
let req = WriteRequest {

View File

@ -18,6 +18,9 @@ use serde::Deserialize;
use thiserror::Error;
use time::{SystemProvider, TimeProvider};
use trace::ctx::SpanContext;
use write_summary::WriteSummary;
const WRITE_TOKEN_HTTP_HEADER: &str = "X-IOx-Write-Token";
/// Errors returned by the `router2` HTTP request handler.
#[derive(Debug, Error)]
@ -263,7 +266,7 @@ impl<D> HttpDelegate<D, SystemProvider> {
impl<D, T> HttpDelegate<D, T>
where
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>>,
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = WriteSummary>,
T: TimeProvider,
{
/// Routes `req` to the appropriate handler, if any, returning the handler
@ -274,10 +277,16 @@ where
(&Method::POST, "/api/v2/delete") => self.delete_handler(req).await,
_ => return Err(Error::NoHandler),
}
.map(|_| response_no_content())
.map(|summary| {
Response::builder()
.status(StatusCode::NO_CONTENT)
.header(WRITE_TOKEN_HTTP_HEADER, summary.to_token())
.body(Body::empty())
.unwrap()
})
}
async fn write_handler(&self, req: Request<Body>) -> Result<(), Error> {
async fn write_handler(&self, req: Request<Body>) -> Result<WriteSummary, Error> {
let span_ctx: Option<SpanContext> = req.extensions().get().cloned();
let write_info = WriteInfo::try_from(&req)?;
@ -300,7 +309,7 @@ where
Ok(v) => v,
Err(mutable_batch_lp::Error::EmptyPayload) => {
debug!("nothing to write");
return Ok(());
return Ok(WriteSummary::default());
}
Err(e) => return Err(Error::ParseLineProtocol(e)),
};
@ -318,7 +327,8 @@ where
"routing write",
);
self.dml_handler
let summary = self
.dml_handler
.write(&namespace, batches, span_ctx)
.await
.map_err(Into::into)?;
@ -328,10 +338,10 @@ where
self.write_metric_tables.inc(num_tables as _);
self.write_metric_body_size.inc(body.len() as _);
Ok(())
Ok(summary)
}
async fn delete_handler(&self, req: Request<Body>) -> Result<(), Error> {
async fn delete_handler(&self, req: Request<Body>) -> Result<WriteSummary, Error> {
let span_ctx: Option<SpanContext> = req.extensions().get().cloned();
let account = WriteInfo::try_from(&req)?;
@ -376,7 +386,9 @@ where
self.delete_metric_body_size.inc(body.len() as _);
Ok(())
// TODO pass back write summaries for deletes as well
// https://github.com/influxdata/influxdb_iox/issues/4209
Ok(WriteSummary::default())
}
/// Parse the request's body into raw bytes, applying the configured size
@ -437,13 +449,6 @@ where
}
}
fn response_no_content() -> Response<Body> {
Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Body::empty())
.unwrap()
}
#[cfg(test)]
mod tests {
use std::{io::Write, iter, sync::Arc};
@ -460,6 +465,10 @@ mod tests {
const MAX_BYTES: usize = 1024;
fn summary() -> WriteSummary {
WriteSummary::default()
}
fn assert_metric_hit(metrics: &metric::Registry, name: &'static str, value: Option<u64>) {
let counter = metrics
.get_instrument::<Metric<U64Counter>>(name)
@ -639,7 +648,7 @@ mod tests {
ok,
query_string = "?org=bananas&bucket=test",
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, ..}] => {
assert_eq!(namespace, "bananas_test");
@ -650,7 +659,7 @@ mod tests {
ok_precision_s,
query_string = "?org=bananas&bucket=test&precision=s",
body = "platanos,tag1=A,tag2=B val=42i 1647622847".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => {
assert_eq!(namespace, "bananas_test");
@ -665,7 +674,7 @@ mod tests {
ok_precision_ms,
query_string = "?org=bananas&bucket=test&precision=ms",
body = "platanos,tag1=A,tag2=B val=42i 1647622847000".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => {
assert_eq!(namespace, "bananas_test");
@ -680,7 +689,7 @@ mod tests {
ok_precision_us,
query_string = "?org=bananas&bucket=test&precision=us",
body = "platanos,tag1=A,tag2=B val=42i 1647622847000000".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => {
assert_eq!(namespace, "bananas_test");
@ -695,7 +704,7 @@ mod tests {
ok_precision_ns,
query_string = "?org=bananas&bucket=test&precision=ns",
body = "platanos,tag1=A,tag2=B val=42i 1647622847000000000".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Ok(_),
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => {
assert_eq!(namespace, "bananas_test");
@ -711,7 +720,7 @@ mod tests {
// SECONDS, so multiplies the provided timestamp by 1,000,000,000
query_string = "?org=bananas&bucket=test&precision=s",
body = "platanos,tag1=A,tag2=B val=42i 1647622847000000000".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Err(Error::ParseLineProtocol(_)),
want_dml_calls = []
);
@ -720,7 +729,7 @@ mod tests {
no_query_params,
query_string = "",
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Err(Error::InvalidOrgBucket(OrgBucketError::NotSpecified)),
want_dml_calls = [] // None
);
@ -729,7 +738,7 @@ mod tests {
no_org_bucket,
query_string = "?",
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Err(Error::InvalidOrgBucket(OrgBucketError::DecodeFail(_))),
want_dml_calls = [] // None
);
@ -738,7 +747,7 @@ mod tests {
empty_org_bucket,
query_string = "?org=&bucket=",
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Err(Error::InvalidOrgBucket(OrgBucketError::NotSpecified)),
want_dml_calls = [] // None
);
@ -747,7 +756,7 @@ mod tests {
invalid_org_bucket,
query_string = format!("?org=test&bucket={}", "A".repeat(1000)),
body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Err(Error::InvalidOrgBucket(OrgBucketError::MappingFail(_))),
want_dml_calls = [] // None
);
@ -756,7 +765,7 @@ mod tests {
invalid_line_protocol,
query_string = "?org=bananas&bucket=test",
body = "not line protocol".as_bytes(),
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Err(Error::ParseLineProtocol(_)),
want_dml_calls = [] // None
);
@ -765,7 +774,7 @@ mod tests {
non_utf8_body,
query_string = "?org=bananas&bucket=test",
body = vec![0xc3, 0x28],
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Err(Error::NonUtf8Body(_)),
want_dml_calls = [] // None
);
@ -793,7 +802,7 @@ mod tests {
.flat_map(|s| s.bytes())
.collect::<Vec<u8>>()
},
dml_handler = [Ok(())],
dml_handler = [Ok(summary())],
want_result = Err(Error::RequestSizeExceeded(_)),
want_dml_calls = [] // None
);

View File

@ -10,7 +10,7 @@ use router2::{
dml_handlers::{
Chain, DmlError, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator,
NamespaceAutocreation, Partitioned, Partitioner, SchemaError, SchemaValidator,
ShardedWriteBuffer,
ShardedWriteBuffer, WriteSummaryAdapter,
},
namespace_cache::{MemoryNamespaceCache, ShardedCache},
sequencer::Sequencer,
@ -55,12 +55,14 @@ type HttpDelegateStack = HttpDelegate<
>,
Partitioner,
>,
WriteSummaryAdapter<
FanOutAdaptor<
ShardedWriteBuffer<JumpHash<Arc<Sequencer>>>,
Vec<Partitioned<HashMap<String, MutableBatch>>>,
>,
>,
>,
>,
>;
/// A [`router2`] stack configured with the various DML handlers using mock
@ -109,7 +111,9 @@ impl TestContext {
let handler_stack = ns_creator
.and_then(schema_validator)
.and_then(partitioner)
.and_then(FanOutAdaptor::new(sharded_write_buffer));
.and_then(WriteSummaryAdapter::new(FanOutAdaptor::new(
sharded_write_buffer,
)));
let handler_stack = InstrumentationDecorator::new("request", &*metrics, handler_stack);

View File

@ -34,20 +34,41 @@ pub async fn write_to_router(
.expect("http error sending write")
}
/// Runs a query using the flight API on the specified connection
/// until responses are produced
///
/// The retry loop is used to wait for writes to become visible
/// Extracts the write token from the specified response (to the /api/v2/write api)
pub fn get_write_token(response: &Response<Body>) -> String {
let message = format!("no write token in {:?}", response);
response
.headers()
.get("X-IOx-Write-Token")
.expect(&message)
.to_str()
.expect("Value not a string")
.to_string()
}
const MAX_QUERY_RETRY_TIME_SEC: u64 = 10;
pub async fn query_until_results(
/// Runs a query using the flight API on the specified connection
/// until responses are produced.
///
/// (Will) eventually Wait until data from the specified write token
/// is readable, but currently waits for
///
/// The retry loop is used to wait for writes to become visible
pub async fn query_when_readable(
sql: impl Into<String>,
namespace: impl Into<String>,
write_token: impl Into<String>,
connection: Connection,
) -> Vec<RecordBatch> {
let namespace = namespace.into();
let sql = sql.into();
println!(
"(TODO) Waiting for Write Token to be visible {}",
write_token.into()
);
let mut client = influxdb_iox_client::flight::Client::new(connection);
// This does nothing except test the client handshake implementation.

26
write_summary/Cargo.toml Normal file
View File

@ -0,0 +1,26 @@
[package]
name = "write_summary"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# Workspace dependencies, in alphabetical order
dml = { path = "../dml" }
generated_types = { path = "../generated_types" }
# Crates.io dependencies, in alphabetical order
base64 = "0.13"
serde_json = "1.0.79"
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
# Workspace dependencies, in alphabetical order
data_types = { path = "../data_types" }
time = { path = "../time" }
# Crates.io dependencies, in alphabetical order

199
write_summary/src/lib.rs Normal file
View File

@ -0,0 +1,199 @@
use std::collections::BTreeMap;
/// Protobuf to/from conversion
use generated_types::influxdata::iox::write_summary::v1 as proto;
use dml::DmlMeta;
/// Contains information about a single write.
///
/// A single write consisting of multiple lines of line protocol
/// formatted data are shared and partitioned across potentially
/// several sequencers which are then processed by the ingester to
/// become readable at potentially different times.
///
/// This struct contains sufficient information to determine the
/// current state of the write as a whole
#[derive(Debug, Default)]
/// Summary of a Vec<Vec<DmlMeta>>
pub struct WriteSummary {
metas: Vec<Vec<DmlMeta>>,
}
impl WriteSummary {
pub fn new(metas: Vec<Vec<DmlMeta>>) -> Self {
Self { metas }
}
/// Return an opaque summary "token" of this summary
pub fn to_token(self) -> String {
let proto_write_summary: proto::WriteSummary = self.into();
base64::encode(
serde_json::to_string(&proto_write_summary)
.expect("unexpected error serializing token to json"),
)
}
}
impl From<WriteSummary> for proto::WriteSummary {
fn from(summary: WriteSummary) -> Self {
// create a map from sequencer_id to sequences
let sequences = summary
.metas
.iter()
.flat_map(|v| v.iter())
.filter_map(|meta| meta.sequence());
// Use BTreeMap to ensure consistent output
let mut sequencers = BTreeMap::new();
for s in sequences {
sequencers
.entry(s.sequencer_id)
.or_insert_with(Vec::new)
.push(s.sequence_number)
}
let sequencers = sequencers
.into_iter()
.map(|(sequencer_id, sequence_numbers)| proto::SequencerWrite {
sequencer_id,
sequence_numbers,
})
.collect();
Self { sequencers }
}
}
#[cfg(test)]
mod tests {
use super::*;
use data_types::sequence::Sequence;
#[test]
fn empty() {
let metas = vec![];
let summary: proto::WriteSummary = WriteSummary::new(metas).into();
let expected = proto::WriteSummary { sequencers: vec![] };
assert_eq!(summary, expected);
}
#[test]
fn one() {
let metas = vec![vec![make_meta(Sequence::new(1, 2))]];
let summary: proto::WriteSummary = WriteSummary::new(metas).into();
let expected = proto::WriteSummary {
sequencers: vec![proto::SequencerWrite {
sequencer_id: 1,
sequence_numbers: vec![2],
}],
};
assert_eq!(summary, expected);
}
#[test]
fn many() {
let metas = vec![
vec![
make_meta(Sequence::new(1, 2)),
make_meta(Sequence::new(10, 20)),
],
vec![make_meta(Sequence::new(1, 3))],
];
let summary: proto::WriteSummary = WriteSummary::new(metas).into();
let expected = proto::WriteSummary {
sequencers: vec![
proto::SequencerWrite {
sequencer_id: 1,
sequence_numbers: vec![2, 3],
},
proto::SequencerWrite {
sequencer_id: 10,
sequence_numbers: vec![20],
},
],
};
assert_eq!(summary, expected);
}
#[test]
fn different_order() {
// order in sequences shouldn't matter
let metas1 = vec![vec![
make_meta(Sequence::new(1, 2)),
make_meta(Sequence::new(2, 3)),
]];
// order in sequences shouldn't matter
let metas2 = vec![vec![
make_meta(Sequence::new(2, 3)),
make_meta(Sequence::new(1, 2)),
]];
let summary1: proto::WriteSummary = WriteSummary::new(metas1).into();
let summary2: proto::WriteSummary = WriteSummary::new(metas2).into();
let expected = proto::WriteSummary {
sequencers: vec![
proto::SequencerWrite {
sequencer_id: 1,
sequence_numbers: vec![2],
},
proto::SequencerWrite {
sequencer_id: 2,
sequence_numbers: vec![3],
},
],
};
assert_eq!(summary1, expected);
assert_eq!(summary2, expected);
}
#[test]
fn token_creation() {
let metas = vec![vec![make_meta(Sequence::new(1, 2))]];
let summary = WriteSummary::new(metas.clone());
let summary_copy = WriteSummary::new(metas);
let metas2 = vec![vec![make_meta(Sequence::new(2, 3))]];
let summary2 = WriteSummary::new(metas2);
let token = summary.to_token();
// non empty
assert!(!token.is_empty());
// same when created with same metas
assert_eq!(token, summary_copy.to_token());
// different when created with different metas
assert_ne!(token, summary2.to_token());
assert!(
!token.contains("sequenceNumbers"),
"token not obscured: {}",
token
);
assert!(
!token.contains("sequencers"),
"token not obscured: {}",
token
);
}
fn make_meta(s: Sequence) -> DmlMeta {
use time::TimeProvider;
let time_provider = time::SystemProvider::new();
let span_context = None;
let bytes_read = 132;
DmlMeta::sequenced(s, time_provider.now(), span_context, bytes_read)
}
}