From 9df4131e6066b1bd0ad456c72d03241117696e97 Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Thu, 11 Mar 2021 16:23:22 +0100 Subject: [PATCH 1/9] feat: Add server remote [set|remove|list] commands --- Cargo.lock | 1 + Cargo.toml | 2 + src/commands/server.rs | 10 +++- src/commands/server_remote.rs | 76 ++++++++++++++++++++++++ src/main.rs | 3 +- tests/end_to_end_cases/management_cli.rs | 59 ++++++++++++++++++ 6 files changed, 147 insertions(+), 4 deletions(-) create mode 100644 src/commands/server_remote.rs diff --git a/Cargo.lock b/Cargo.lock index 50de29dc12..e1101ea91b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1507,6 +1507,7 @@ dependencies = [ "panic_logging", "parking_lot", "predicates", + "prettytable-rs", "prost", "query", "rand 0.7.3", diff --git a/Cargo.toml b/Cargo.toml index 47a3118d39..efd5b79e1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,6 +70,8 @@ http = "0.2.0" hyper = "0.14" opentelemetry = { version = "0.12", default-features = false, features = ["trace", "tokio-support"] } opentelemetry-jaeger = { version = "0.11", features = ["tokio"] } +# used by arrow/datafusion anyway +prettytable-rs = "0.8" prost = "0.7" # Forked to upgrade hyper and tokio routerify = { git = "https://github.com/influxdata/routerify", rev = "274e250" } diff --git a/src/commands/server.rs b/src/commands/server.rs index de8afcf86a..d270bf2969 100644 --- a/src/commands/server.rs +++ b/src/commands/server.rs @@ -1,7 +1,7 @@ //! Implementation of command line option for manipulating and showing server //! config -use crate::commands::logging::LoggingLevel; +use crate::commands::{logging::LoggingLevel, server_remote}; use crate::influxdb_ioxd; use clap::arg_enum; use std::{net::SocketAddr, net::ToSocketAddrs, path::PathBuf}; @@ -20,8 +20,10 @@ pub const FALLBACK_AWS_REGION: &str = "us-east-1"; #[derive(Debug, Error)] pub enum Error { - #[error("Server error: {0}")] + #[error("Run: {0}")] ServerError(#[from] influxdb_ioxd::Error), + #[error("Remote: {0}")] + RemoteError(#[from] server_remote::Error), } pub type Result = std::result::Result; @@ -30,6 +32,7 @@ pub type Result = std::result::Result; #[structopt(name = "server", about = "IOx server commands")] pub enum Config { Run(RunConfig), + Remote(crate::commands::server_remote::Config), } #[derive(Debug, StructOpt)] @@ -231,9 +234,10 @@ Possible values (case insensitive): pub jaeger_host: Option, } -pub async fn command(logging_level: LoggingLevel, config: Config) -> Result<()> { +pub async fn command(logging_level: LoggingLevel, url: String, config: Config) -> Result<()> { match config { Config::Run(config) => Ok(influxdb_ioxd::main(logging_level, config).await?), + Config::Remote(config) => Ok(server_remote::command(url, config).await?), } } diff --git a/src/commands/server_remote.rs b/src/commands/server_remote.rs new file mode 100644 index 0000000000..5a6c4fef1c --- /dev/null +++ b/src/commands/server_remote.rs @@ -0,0 +1,76 @@ +use influxdb_iox_client::{connection::Builder, management}; +use structopt::StructOpt; +use thiserror::Error; + +use prettytable::{format, Cell, Row, Table}; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Error connecting to IOx: {0}")] + ConnectionError(#[from] influxdb_iox_client::connection::Error), + + #[error("Update remote error: {0}")] + UpdateError(#[from] management::UpdateRemoteError), + + #[error("List remote error: {0}")] + ListError(#[from] management::ListRemotesError), +} + +pub type Result = std::result::Result; + +#[derive(Debug, StructOpt)] +#[structopt( + name = "remote", + about = "Manage configuration about other IOx servers" +)] +pub enum Config { + /// Set connection parameters for a remote IOx server. + Set { id: u32, connection_string: String }, + /// Remove a reference to a remote IOx server. + Remove { id: u32 }, + /// List configured remote IOx server. + List, +} + +pub async fn command(url: String, config: Config) -> Result<()> { + let connection = Builder::default().build(url).await?; + + match config { + Config::Set { + id, + connection_string, + } => { + let mut client = management::Client::new(connection); + client.update_remote(id, connection_string).await?; + } + Config::Remove { id } => { + let mut client = management::Client::new(connection); + client.delete_remote(id).await?; + } + Config::List => { + let mut client = management::Client::new(connection); + + let remotes = client.list_remotes().await?; + if remotes.is_empty() { + println!("no remotes configured"); + } else { + let mut table = Table::new(); + table.set_format(*format::consts::FORMAT_NO_LINESEP_WITH_TITLE); + table.set_titles(Row::new(vec![ + Cell::new("ID"), + Cell::new("Connection string"), + ])); + + for i in remotes { + table.add_row(Row::new(vec![ + Cell::new(&format!("{}", i.id)), + Cell::new(&i.connection_string), + ])); + } + print!("{}", table); + } + } + }; + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 0f9d1d601d..3d7e10b201 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,6 +24,7 @@ mod commands { pub mod logging; pub mod meta; pub mod server; + pub mod server_remote; pub mod stats; pub mod writer; } @@ -185,7 +186,7 @@ fn main() -> Result<(), std::io::Error> { Command::Server(config) => { // Note don't set up basic logging here, different logging rules apply in server // mode - if let Err(e) = commands::server::command(logging_level, *config).await { + if let Err(e) = commands::server::command(logging_level, host, *config).await { eprintln!("Server command failed: {}", e); std::process::exit(ReturnCode::Failure as _) } diff --git a/tests/end_to_end_cases/management_cli.rs b/tests/end_to_end_cases/management_cli.rs index c28c5fe56a..81532fbf3b 100644 --- a/tests/end_to_end_cases/management_cli.rs +++ b/tests/end_to_end_cases/management_cli.rs @@ -10,6 +10,7 @@ pub async fn test() { test_writer_id(addr).await; test_create_database(addr).await; + test_remotes(addr).await; } async fn test_writer_id(addr: &str) { @@ -81,3 +82,61 @@ async fn test_create_database(addr: &str) { .success() .stdout(predicate::str::contains(format!("name: \"{}\"", db))); } + +async fn test_remotes(addr: &str) { + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("server") + .arg("remote") + .arg("list") + .arg("--host") + .arg(addr) + .assert() + .success() + .stdout(predicate::str::contains("no remotes configured")); + + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("server") + .arg("remote") + .arg("set") + .arg("1") + .arg("http://1.2.3.4:1234") + .arg("--host") + .arg(addr) + .assert() + .success(); + + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("server") + .arg("remote") + .arg("list") + .arg("--host") + .arg(addr) + .assert() + .success() + .stdout(predicate::str::contains("http://1.2.3.4:1234")); + + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("server") + .arg("remote") + .arg("remove") + .arg("1") + .arg("--host") + .arg(addr) + .assert() + .success(); + + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("server") + .arg("remote") + .arg("list") + .arg("--host") + .arg(addr) + .assert() + .success() + .stdout(predicate::str::contains("no remotes configured")); +} From 6ac7e2c1a77cd5d1ac3e93df01c95efeeb48cdff Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 12 Mar 2021 08:56:14 -0500 Subject: [PATCH 2/9] feat: Add management API and CLI to list chunks (#968) * feat: Add management API and CLI to list chunks * fix: Apply suggestions from code review Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> * fix: add comment to protobuf * fix: fix comment * fix: fmt, fixup merge errors * fix: fascinating type dance with prost generated types * fix: clippy * fix: move command to influxdb_iox database chunk list Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> --- data_types/src/chunk.rs | 176 ++++++++++++++++++ data_types/src/lib.rs | 1 + generated_types/build.rs | 1 + .../influxdata/iox/management/v1/chunk.proto | 37 ++++ .../iox/management/v1/service.proto | 14 ++ .../influxdata/iox/write/v1/service.proto | 2 +- influxdb_iox_client/src/client/management.rs | 23 +++ influxdb_iox_client/src/client/write.rs | 6 +- mutable_buffer/src/database.rs | 8 + mutable_buffer/src/partition.rs | 5 + query/src/lib.rs | 8 +- query/src/test.rs | 4 + read_buffer/src/lib.rs | 29 +++ server/src/db.rs | 104 ++++++++++- server/src/db/chunk.rs | 83 +++++++-- server/src/snapshot.rs | 2 +- src/commands/database.rs | 12 +- src/commands/database/chunk.rs | 73 ++++++++ src/influxdb_ioxd/rpc/error.rs | 29 ++- src/influxdb_ioxd/rpc/management.rs | 37 +++- src/influxdb_ioxd/rpc/write.rs | 6 +- tests/end_to_end_cases/management_api.rs | 128 +++++++++++-- tests/end_to_end_cases/management_cli.rs | 101 ++++++++-- tests/end_to_end_cases/util.rs | 51 +++++ tests/end_to_end_cases/write_api.rs | 15 +- 25 files changed, 879 insertions(+), 76 deletions(-) create mode 100644 data_types/src/chunk.rs create mode 100644 generated_types/protos/influxdata/iox/management/v1/chunk.proto create mode 100644 src/commands/database/chunk.rs diff --git a/data_types/src/chunk.rs b/data_types/src/chunk.rs new file mode 100644 index 0000000000..b68d618b5c --- /dev/null +++ b/data_types/src/chunk.rs @@ -0,0 +1,176 @@ +//! Module contains a representation of chunk metadata +use std::{convert::TryFrom, sync::Arc}; + +use crate::field_validation::FromField; +use generated_types::{google::FieldViolation, influxdata::iox::management::v1 as management}; +use serde::{Deserialize, Serialize}; + +/// Which storage system is a chunk located in? +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)] +pub enum ChunkStorage { + /// The chunk is still open for new writes, in the Mutable Buffer + OpenMutableBuffer, + + /// The chunk is no longer open for writes, in the Mutable Buffer + ClosedMutableBuffer, + + /// The chunk is in the Read Buffer (where it can not be mutated) + ReadBuffer, + + /// The chunk is stored in Object Storage (where it can not be mutated) + ObjectStore, +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)] +/// Represents metadata about a chunk in a database. +/// A chunk can contain one or more tables. +pub struct ChunkSummary { + /// The partitition key of this chunk + pub partition_key: Arc, + + /// The id of this chunk + pub id: u32, + + /// How is this chunk stored? + pub storage: ChunkStorage, + + /// The total estimated size of this chunk, in bytes + pub estimated_bytes: usize, +} + +/// Conversion code to management API chunk structure +impl From for management::Chunk { + fn from(summary: ChunkSummary) -> Self { + let ChunkSummary { + partition_key, + id, + storage, + estimated_bytes, + } = summary; + + let storage: management::ChunkStorage = storage.into(); + let storage = storage.into(); // convert to i32 + + let estimated_bytes = estimated_bytes as u64; + + let partition_key = match Arc::try_unwrap(partition_key) { + // no one else has a reference so take the string + Ok(partition_key) => partition_key, + // some other refernece exists to this string, so clone it + Err(partition_key) => partition_key.as_ref().clone(), + }; + + Self { + partition_key, + id, + storage, + estimated_bytes, + } + } +} + +impl From for management::ChunkStorage { + fn from(storage: ChunkStorage) -> Self { + match storage { + ChunkStorage::OpenMutableBuffer => Self::OpenMutableBuffer, + ChunkStorage::ClosedMutableBuffer => Self::ClosedMutableBuffer, + ChunkStorage::ReadBuffer => Self::ReadBuffer, + ChunkStorage::ObjectStore => Self::ObjectStore, + } + } +} + +/// Conversion code from management API chunk structure +impl TryFrom for ChunkSummary { + type Error = FieldViolation; + + fn try_from(proto: management::Chunk) -> Result { + // Use prost enum conversion + let storage = proto.storage().scope("storage")?; + + let management::Chunk { + partition_key, + id, + estimated_bytes, + .. + } = proto; + + let estimated_bytes = estimated_bytes as usize; + let partition_key = Arc::new(partition_key); + + Ok(Self { + partition_key, + id, + storage, + estimated_bytes, + }) + } +} + +impl TryFrom for ChunkStorage { + type Error = FieldViolation; + + fn try_from(proto: management::ChunkStorage) -> Result { + match proto { + management::ChunkStorage::OpenMutableBuffer => Ok(Self::OpenMutableBuffer), + management::ChunkStorage::ClosedMutableBuffer => Ok(Self::ClosedMutableBuffer), + management::ChunkStorage::ReadBuffer => Ok(Self::ReadBuffer), + management::ChunkStorage::ObjectStore => Ok(Self::ObjectStore), + management::ChunkStorage::Unspecified => Err(FieldViolation::required("")), + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn valid_proto_to_summary() { + let proto = management::Chunk { + partition_key: "foo".to_string(), + id: 42, + estimated_bytes: 1234, + storage: management::ChunkStorage::ObjectStore.into(), + }; + + let summary = ChunkSummary::try_from(proto).expect("conversion successful"); + let expected = ChunkSummary { + partition_key: Arc::new("foo".to_string()), + id: 42, + estimated_bytes: 1234, + storage: ChunkStorage::ObjectStore, + }; + + assert_eq!( + summary, expected, + "Actual:\n\n{:?}\n\nExpected:\n\n{:?}\n\n", + summary, expected + ); + } + + #[test] + fn valid_summary_to_proto() { + let summary = ChunkSummary { + partition_key: Arc::new("foo".to_string()), + id: 42, + estimated_bytes: 1234, + storage: ChunkStorage::ObjectStore, + }; + + let proto = management::Chunk::try_from(summary).expect("conversion successful"); + + let expected = management::Chunk { + partition_key: "foo".to_string(), + id: 42, + estimated_bytes: 1234, + storage: management::ChunkStorage::ObjectStore.into(), + }; + + assert_eq!( + proto, expected, + "Actual:\n\n{:?}\n\nExpected:\n\n{:?}\n\n", + proto, expected + ); + } +} diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 9b294abaab..c7edc30360 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -20,6 +20,7 @@ pub const TABLE_NAMES_COLUMN_NAME: &str = "table"; /// `column_names`. pub const COLUMN_NAMES_COLUMN_NAME: &str = "column"; +pub mod chunk; pub mod data; pub mod database_rules; pub mod error; diff --git a/generated_types/build.rs b/generated_types/build.rs index 74573e028d..7e66a0f15f 100644 --- a/generated_types/build.rs +++ b/generated_types/build.rs @@ -39,6 +39,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> { idpe_path.join("source.proto"), management_path.join("base_types.proto"), management_path.join("database_rules.proto"), + management_path.join("chunk.proto"), management_path.join("service.proto"), write_path.join("service.proto"), root.join("grpc/health/v1/service.proto"), diff --git a/generated_types/protos/influxdata/iox/management/v1/chunk.proto b/generated_types/protos/influxdata/iox/management/v1/chunk.proto new file mode 100644 index 0000000000..23b30317b2 --- /dev/null +++ b/generated_types/protos/influxdata/iox/management/v1/chunk.proto @@ -0,0 +1,37 @@ +syntax = "proto3"; +package influxdata.iox.management.v1; + + + // Which storage system is a chunk located in? +enum ChunkStorage { + // Not currently returned + CHUNK_STORAGE_UNSPECIFIED = 0; + + // The chunk is still open for new writes, in the Mutable Buffer + CHUNK_STORAGE_OPEN_MUTABLE_BUFFER = 1; + + // The chunk is no longer open for writes, in the Mutable Buffer + CHUNK_STORAGE_CLOSED_MUTABLE_BUFFER = 2; + + // The chunk is in the Read Buffer (where it can not be mutated) + CHUNK_STORAGE_READ_BUFFER = 3; + + // The chunk is stored in Object Storage (where it can not be mutated) + CHUNK_STORAGE_OBJECT_STORE = 4; +} + +// `Chunk` represents part of a partition of data in a database. +// A chunk can contain one or more tables. +message Chunk { + // The partitition key of this chunk + string partition_key = 1; + + // The id of this chunk + uint32 id = 2; + + // Which storage system the chunk is located in + ChunkStorage storage = 3; + + // The total estimated size of this chunk, in bytes + uint64 estimated_bytes = 4; +} diff --git a/generated_types/protos/influxdata/iox/management/v1/service.proto b/generated_types/protos/influxdata/iox/management/v1/service.proto index f8fb6edd1e..e09f9d652f 100644 --- a/generated_types/protos/influxdata/iox/management/v1/service.proto +++ b/generated_types/protos/influxdata/iox/management/v1/service.proto @@ -3,6 +3,7 @@ package influxdata.iox.management.v1; import "google/protobuf/empty.proto"; import "influxdata/iox/management/v1/database_rules.proto"; +import "influxdata/iox/management/v1/chunk.proto"; service ManagementService { rpc GetWriterId(GetWriterIdRequest) returns (GetWriterIdResponse); @@ -15,6 +16,9 @@ service ManagementService { rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse); + // List chunks available on this database + rpc ListChunks(ListChunksRequest) returns (ListChunksResponse); + // List remote IOx servers we know about. rpc ListRemotes(ListRemotesRequest) returns (ListRemotesResponse); @@ -57,6 +61,16 @@ message CreateDatabaseRequest { message CreateDatabaseResponse {} +message ListChunksRequest { + // the name of the database + string db_name = 1; +} + +message ListChunksResponse { + repeated Chunk chunks = 1; +} + + message ListRemotesRequest {} message ListRemotesResponse { diff --git a/generated_types/protos/influxdata/iox/write/v1/service.proto b/generated_types/protos/influxdata/iox/write/v1/service.proto index 9476204780..c0fa0a9cbf 100644 --- a/generated_types/protos/influxdata/iox/write/v1/service.proto +++ b/generated_types/protos/influxdata/iox/write/v1/service.proto @@ -9,7 +9,7 @@ service WriteService { message WriteRequest { // name of database into which to write - string name = 1; + string db_name = 1; // data, in [LineProtocol] format // diff --git a/influxdb_iox_client/src/client/management.rs b/influxdb_iox_client/src/client/management.rs index df98aaf891..10b0c5bd6a 100644 --- a/influxdb_iox_client/src/client/management.rs +++ b/influxdb_iox_client/src/client/management.rs @@ -80,6 +80,14 @@ pub enum GetDatabaseError { ServerError(tonic::Status), } +/// Errors returned by Client::list_chunks +#[derive(Debug, Error)] +pub enum ListChunksError { + /// Client received an unexpected error from the server + #[error("Unexpected server error: {}: {}", .0.code(), .0.message())] + ServerError(tonic::Status), +} + /// Errors returned by Client::list_remotes #[derive(Debug, Error)] pub enum ListRemotesError { @@ -215,6 +223,21 @@ impl Client { Ok(rules) } + /// List chunks in a database. + pub async fn list_chunks( + &mut self, + db_name: impl Into, + ) -> Result, ListChunksError> { + let db_name = db_name.into(); + + let response = self + .inner + .list_chunks(ListChunksRequest { db_name }) + .await + .map_err(ListChunksError::ServerError)?; + Ok(response.into_inner().chunks) + } + /// List remotes. pub async fn list_remotes(&mut self) -> Result, ListRemotesError> { let response = self diff --git a/influxdb_iox_client/src/client/write.rs b/influxdb_iox_client/src/client/write.rs index cea5ad12a8..4f57e0bd06 100644 --- a/influxdb_iox_client/src/client/write.rs +++ b/influxdb_iox_client/src/client/write.rs @@ -61,14 +61,14 @@ impl Client { /// [LineProtocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format) pub async fn write( &mut self, - name: impl Into, + db_name: impl Into, lp_data: impl Into, ) -> Result { - let name = name.into(); + let db_name = db_name.into(); let lp_data = lp_data.into(); let response = self .inner - .write(WriteRequest { name, lp_data }) + .write(WriteRequest { db_name, lp_data }) .await .map_err(WriteError::ServerError)?; diff --git a/mutable_buffer/src/database.rs b/mutable_buffer/src/database.rs index 11e0067ee0..200b64c307 100644 --- a/mutable_buffer/src/database.rs +++ b/mutable_buffer/src/database.rs @@ -79,6 +79,14 @@ impl MutableBufferDb { } } + /// returns the id of the current open chunk in the specified partition + pub fn open_chunk_id(&self, partition_key: &str) -> u32 { + let partition = self.get_partition(partition_key); + let partition = partition.read().expect("mutex poisoned"); + + partition.open_chunk_id() + } + /// Directs the writes from batch into the appropriate partitions fn write_entries_to_partitions(&self, batch: &wal::WriteBufferBatch<'_>) -> Result<()> { if let Some(entries) = batch.entries() { diff --git a/mutable_buffer/src/partition.rs b/mutable_buffer/src/partition.rs index e64036056a..0d588cadd0 100644 --- a/mutable_buffer/src/partition.rs +++ b/mutable_buffer/src/partition.rs @@ -107,6 +107,11 @@ impl Partition { } } + /// returns the id of the current open chunk in this partition + pub(crate) fn open_chunk_id(&self) -> u32 { + self.open_chunk.id() + } + /// write data to the open chunk pub fn write_entry(&mut self, entry: &wb::WriteBufferEntry<'_>) -> Result<()> { assert_eq!( diff --git a/query/src/lib.rs b/query/src/lib.rs index c3b9fecef6..b1a3a53b74 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -9,7 +9,8 @@ use arrow_deps::datafusion::physical_plan::SendableRecordBatchStream; use async_trait::async_trait; use data_types::{ - data::ReplicatedWrite, partition_metadata::TableSummary, schema::Schema, selection::Selection, + chunk::ChunkSummary, data::ReplicatedWrite, partition_metadata::TableSummary, schema::Schema, + selection::Selection, }; use exec::{stringset::StringSet, Executor}; @@ -49,6 +50,9 @@ pub trait Database: Debug + Send + Sync { /// covering set means that together the chunks make up a single /// complete copy of the data being queried. fn chunks(&self, partition_key: &str) -> Vec>; + + /// Return a summary of all chunks in this database, in all partitions + fn chunk_summaries(&self) -> Result, Self::Error>; } /// Collection of data that shares the same partition key @@ -60,7 +64,7 @@ pub trait PartitionChunk: Debug + Send + Sync { /// particular partition. fn id(&self) -> u32; - /// returns the partition metadata stats for every table in the partition + /// returns the partition metadata stats for every table in the chunk fn table_stats(&self) -> Result, Self::Error>; /// Returns true if this chunk *might* have data that passes the diff --git a/query/src/test.rs b/query/src/test.rs index 642c5a0f27..8117dae1df 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -154,6 +154,10 @@ impl Database for TestDatabase { vec![] } } + + fn chunk_summaries(&self) -> Result, Self::Error> { + unimplemented!("summaries not implemented TestDatabase") + } } #[derive(Debug, Default)] diff --git a/read_buffer/src/lib.rs b/read_buffer/src/lib.rs index b8d9f13751..b0df18b8e5 100644 --- a/read_buffer/src/lib.rs +++ b/read_buffer/src/lib.rs @@ -177,6 +177,25 @@ impl Database { .unwrap_or_default() } + /// Returns the total estimated size in bytes for the chunks in the + /// specified partition. Returns None if there is no such partition + pub fn chunks_size<'a>( + &self, + partition_key: &str, + chunk_ids: impl IntoIterator, + ) -> Option { + let partition_data = self.data.read().unwrap(); + + let partition = partition_data.partitions.get(partition_key); + + partition.map(|partition| { + chunk_ids + .into_iter() + .map(|chunk_id| partition.chunk_size(*chunk_id)) + .sum::() + }) + } + /// Returns the total estimated size in bytes of the database. pub fn size(&self) -> u64 { let base_size = std::mem::size_of::(); @@ -662,6 +681,16 @@ impl Partition { .map(|chunk| std::mem::size_of::() as u64 + chunk.size()) .sum::() } + + /// The total estimated size in bytes of the specified chunk id + pub fn chunk_size(&self, chunk_id: u32) -> u64 { + let chunk_data = self.data.read().unwrap(); + chunk_data + .chunks + .get(&chunk_id) + .map(|chunk| chunk.size()) + .unwrap_or(0) // treat unknown chunks as zero size + } } /// ReadFilterResults implements ... diff --git a/server/src/db.rs b/server/src/db.rs index b5ad60244b..e026994162 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -10,7 +10,9 @@ use std::{ }; use async_trait::async_trait; -use data_types::{data::ReplicatedWrite, database_rules::DatabaseRules, selection::Selection}; +use data_types::{ + chunk::ChunkSummary, data::ReplicatedWrite, database_rules::DatabaseRules, selection::Selection, +}; use mutable_buffer::MutableBufferDb; use parking_lot::Mutex; use query::{Database, PartitionChunk}; @@ -121,7 +123,7 @@ impl Db { local_store .rollover_partition(partition_key) .context(RollingPartition) - .map(DBChunk::new_mb) + .map(|c| DBChunk::new_mb(c, partition_key, false)) } else { DatatbaseNotWriteable {}.fail() } @@ -131,10 +133,15 @@ impl Db { // potentially be migrated into the read buffer or object store) pub fn mutable_buffer_chunks(&self, partition_key: &str) -> Vec> { let chunks = if let Some(mutable_buffer) = self.mutable_buffer.as_ref() { + let open_chunk_id = mutable_buffer.open_chunk_id(partition_key); + mutable_buffer .chunks(partition_key) .into_iter() - .map(DBChunk::new_mb) + .map(|c| { + let open = c.id() == open_chunk_id; + DBChunk::new_mb(c, partition_key, open) + }) .collect() } else { vec![] @@ -162,7 +169,7 @@ impl Db { .as_ref() .context(DatatbaseNotWriteable)? .drop_chunk(partition_key, chunk_id) - .map(DBChunk::new_mb) + .map(|c| DBChunk::new_mb(c, partition_key, false)) .context(MutableBufferDrop) } @@ -313,6 +320,21 @@ impl Database for Db { .partition_keys() .context(MutableBufferRead) } + + fn chunk_summaries(&self) -> Result> { + let summaries = self + .partition_keys()? + .into_iter() + .map(|partition_key| { + self.mutable_buffer_chunks(&partition_key) + .into_iter() + .chain(self.read_buffer_chunks(&partition_key).into_iter()) + .map(|c| c.summary()) + }) + .flatten() + .collect(); + Ok(summaries) + } } #[cfg(test)] @@ -324,8 +346,9 @@ mod tests { use arrow_deps::{ arrow::record_batch::RecordBatch, assert_table_eq, datafusion::physical_plan::collect, }; - use data_types::database_rules::{ - MutableBufferConfig, Order, PartitionSort, PartitionSortRules, + use data_types::{ + chunk::ChunkStorage, + database_rules::{MutableBufferConfig, Order, PartitionSort, PartitionSortRules}, }; use query::{ exec::Executor, frontend::sql::SQLQueryPlanner, test::TestLPWriter, PartitionChunk, @@ -559,6 +582,75 @@ mod tests { db.rules.mutable_buffer_config = Some(mbconf); } + #[tokio::test] + async fn chunk_summaries() { + // Test that chunk id listing is hooked up + let db = make_db(); + let mut writer = TestLPWriter::default(); + + // get three chunks: one open, one closed in mb and one close in rb + + writer.write_lp_string(&db, "cpu bar=1 1").await.unwrap(); + db.rollover_partition("1970-01-01T00").await.unwrap(); + + writer + .write_lp_string(&db, "cpu bar=1,baz=2 2") + .await + .unwrap(); + + // a fourth chunk in a different partition + writer + .write_lp_string(&db, "cpu bar=1,baz2,frob=3 400000000000000") + .await + .unwrap(); + + print!("Partitions: {:?}", db.partition_keys().unwrap()); + + db.load_chunk_to_read_buffer("1970-01-01T00", 0) + .await + .unwrap(); + + fn to_arc(s: &str) -> Arc { + Arc::new(s.to_string()) + } + + let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return"); + chunk_summaries.sort_unstable(); + + let expected = vec![ + ChunkSummary { + partition_key: to_arc("1970-01-01T00"), + id: 0, + storage: ChunkStorage::ClosedMutableBuffer, + estimated_bytes: 70, + }, + ChunkSummary { + partition_key: to_arc("1970-01-01T00"), + id: 0, + storage: ChunkStorage::ReadBuffer, + estimated_bytes: 1221, + }, + ChunkSummary { + partition_key: to_arc("1970-01-01T00"), + id: 1, + storage: ChunkStorage::OpenMutableBuffer, + estimated_bytes: 101, + }, + ChunkSummary { + partition_key: to_arc("1970-01-05T15"), + id: 0, + storage: ChunkStorage::OpenMutableBuffer, + estimated_bytes: 107, + }, + ]; + + assert_eq!( + expected, chunk_summaries, + "expected:\n{:#?}\n\nactual:{:#?}\n\n", + expected, chunk_summaries + ); + } + // run a sql query against the database, returning the results as record batches async fn run_query(db: &Db, query: &str) -> Vec { let planner = SQLQueryPlanner::default(); diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 185f033d32..377edf546f 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -1,5 +1,9 @@ use arrow_deps::datafusion::physical_plan::SendableRecordBatchStream; -use data_types::{schema::Schema, selection::Selection}; +use data_types::{ + chunk::{ChunkStorage, ChunkSummary}, + schema::Schema, + selection::Selection, +}; use mutable_buffer::chunk::Chunk as MBChunk; use query::{exec::stringset::StringSet, predicate::Predicate, PartitionChunk}; use read_buffer::Database as ReadBufferDb; @@ -58,10 +62,13 @@ pub type Result = std::result::Result; pub enum DBChunk { MutableBuffer { chunk: Arc, + partition_key: Arc, + /// is this chunk open for writing? + open: bool, }, ReadBuffer { db: Arc, - partition_key: String, + partition_key: Arc, chunk_id: u32, }, ParquetFile, // TODO add appropriate type here @@ -69,8 +76,17 @@ pub enum DBChunk { impl DBChunk { /// Create a new mutable buffer chunk - pub fn new_mb(chunk: Arc) -> Arc { - Arc::new(Self::MutableBuffer { chunk }) + pub fn new_mb( + chunk: Arc, + partition_key: impl Into, + open: bool, + ) -> Arc { + let partition_key = Arc::new(partition_key.into()); + Arc::new(Self::MutableBuffer { + chunk, + partition_key, + open, + }) } /// create a new read buffer chunk @@ -79,13 +95,54 @@ impl DBChunk { partition_key: impl Into, chunk_id: u32, ) -> Arc { - let partition_key = partition_key.into(); + let partition_key = Arc::new(partition_key.into()); Arc::new(Self::ReadBuffer { db, chunk_id, partition_key, }) } + + pub fn summary(&self) -> ChunkSummary { + match self { + Self::MutableBuffer { + chunk, + partition_key, + open, + } => { + let storage = if *open { + ChunkStorage::OpenMutableBuffer + } else { + ChunkStorage::ClosedMutableBuffer + }; + ChunkSummary { + partition_key: Arc::clone(partition_key), + id: chunk.id(), + storage, + estimated_bytes: chunk.size(), + } + } + Self::ReadBuffer { + db, + partition_key, + chunk_id, + } => { + let estimated_bytes = db + .chunks_size(partition_key.as_ref(), &[*chunk_id]) + .unwrap_or(0) as usize; + + ChunkSummary { + partition_key: Arc::clone(&partition_key), + id: *chunk_id, + storage: ChunkStorage::ReadBuffer, + estimated_bytes, + } + } + Self::ParquetFile => { + unimplemented!("parquet file summary not implemented") + } + } + } } #[async_trait] @@ -94,7 +151,7 @@ impl PartitionChunk for DBChunk { fn id(&self) -> u32 { match self { - Self::MutableBuffer { chunk } => chunk.id(), + Self::MutableBuffer { chunk, .. } => chunk.id(), Self::ReadBuffer { chunk_id, .. } => *chunk_id, Self::ParquetFile => unimplemented!("parquet file not implemented"), } @@ -104,7 +161,7 @@ impl PartitionChunk for DBChunk { &self, ) -> Result, Self::Error> { match self { - Self::MutableBuffer { chunk } => chunk.table_stats().context(MutableBufferChunk), + Self::MutableBuffer { chunk, .. } => chunk.table_stats().context(MutableBufferChunk), Self::ReadBuffer { .. } => unimplemented!("read buffer not implemented"), Self::ParquetFile => unimplemented!("parquet file not implemented"), } @@ -116,7 +173,7 @@ impl PartitionChunk for DBChunk { _known_tables: &StringSet, ) -> Result, Self::Error> { let names = match self { - Self::MutableBuffer { chunk } => { + Self::MutableBuffer { chunk, .. } => { if chunk.is_empty() { Some(StringSet::new()) } else { @@ -192,7 +249,7 @@ impl PartitionChunk for DBChunk { selection: Selection<'_>, ) -> Result { match self { - DBChunk::MutableBuffer { chunk } => chunk + DBChunk::MutableBuffer { chunk, .. } => chunk .table_schema(table_name, selection) .context(MutableBufferChunk), DBChunk::ReadBuffer { @@ -235,7 +292,7 @@ impl PartitionChunk for DBChunk { fn has_table(&self, table_name: &str) -> bool { match self { - Self::MutableBuffer { chunk } => chunk.has_table(table_name), + Self::MutableBuffer { chunk, .. } => chunk.has_table(table_name), Self::ReadBuffer { db, partition_key, @@ -257,7 +314,7 @@ impl PartitionChunk for DBChunk { selection: Selection<'_>, ) -> Result { match self { - Self::MutableBuffer { chunk } => { + Self::MutableBuffer { chunk, .. } => { // Note MutableBuffer doesn't support predicate // pushdown (other than pruning out the entire chunk // via `might_pass_predicate) @@ -341,7 +398,7 @@ impl PartitionChunk for DBChunk { columns: Selection<'_>, ) -> Result, Self::Error> { match self { - Self::MutableBuffer { chunk } => { + Self::MutableBuffer { chunk, .. } => { let chunk_predicate = match to_mutable_buffer_predicate(chunk, predicate) { Ok(chunk_predicate) => chunk_predicate, Err(e) => { @@ -389,7 +446,7 @@ impl PartitionChunk for DBChunk { predicate: &Predicate, ) -> Result, Self::Error> { match self { - Self::MutableBuffer { chunk } => { + Self::MutableBuffer { chunk, .. } => { use mutable_buffer::chunk::Error::UnsupportedColumnTypeForListingValues; let chunk_predicate = match to_mutable_buffer_predicate(chunk, predicate) { diff --git a/server/src/snapshot.rs b/server/src/snapshot.rs index 69a7d1726f..403435fdfa 100644 --- a/server/src/snapshot.rs +++ b/server/src/snapshot.rs @@ -442,7 +442,7 @@ mem,host=A,region=west used=45 1 ]; let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); - let chunk = DBChunk::new_mb(Arc::new(ChunkWB::new(11))); + let chunk = DBChunk::new_mb(Arc::new(ChunkWB::new(11)), "key", false); let mut metadata_path = store.new_path(); metadata_path.push_dir("meta"); diff --git a/src/commands/database.rs b/src/commands/database.rs index 5efbc80847..74b2a12df1 100644 --- a/src/commands/database.rs +++ b/src/commands/database.rs @@ -13,6 +13,8 @@ use influxdb_iox_client::{ use structopt::StructOpt; use thiserror::Error; +mod chunk; + #[derive(Debug, Error)] pub enum Error { #[error("Error creating database: {0}")] @@ -41,6 +43,9 @@ pub enum Error { #[error("Error querying: {0}")] Query(#[from] influxdb_iox_client::flight::Error), + + #[error("Error in chunk subcommand: {0}")] + Chunk(#[from] chunk::Error), } pub type Result = std::result::Result; @@ -101,10 +106,11 @@ enum Command { Get(Get), Write(Write), Query(Query), + Chunk(chunk::Config), } pub async fn command(url: String, config: Config) -> Result<()> { - let connection = Builder::default().build(url).await?; + let connection = Builder::default().build(url.clone()).await?; match config.command { Command::Create(command) => { @@ -173,8 +179,12 @@ pub async fn command(url: String, config: Config) -> Result<()> { } let formatted_result = format.format(&batches)?; + println!("{}", formatted_result); } + Command::Chunk(config) => { + chunk::command(url, config).await?; + } } Ok(()) diff --git a/src/commands/database/chunk.rs b/src/commands/database/chunk.rs new file mode 100644 index 0000000000..6f8c3a3806 --- /dev/null +++ b/src/commands/database/chunk.rs @@ -0,0 +1,73 @@ +//! This module implements the `chunk` CLI command +use data_types::chunk::ChunkSummary; +use generated_types::google::FieldViolation; +use influxdb_iox_client::{ + connection::Builder, + management::{self, ListChunksError}, +}; +use std::convert::TryFrom; +use structopt::StructOpt; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Error listing chunks: {0}")] + ListChunkError(#[from] ListChunksError), + + #[error("Error interpreting server response: {:?}", .0)] + ConvertingResponse(FieldViolation), + + #[error("Error rendering response as JSON: {0}")] + WritingJson(#[from] serde_json::Error), + + #[error("Error connecting to IOx: {0}")] + ConnectionError(#[from] influxdb_iox_client::connection::Error), +} + +pub type Result = std::result::Result; + +/// Manage IOx databases +#[derive(Debug, StructOpt)] +pub struct Config { + #[structopt(subcommand)] + command: Command, +} + +/// List the chunks for the specified database in JSON format +#[derive(Debug, StructOpt)] +struct List { + /// The name of the database + db_name: String, +} + +/// All possible subcommands for chunk +#[derive(Debug, StructOpt)] +enum Command { + List(List), +} + +pub async fn command(url: String, config: Config) -> Result<()> { + let connection = Builder::default().build(url).await?; + + match config.command { + Command::List(get) => { + let List { db_name } = get; + + let mut client = management::Client::new(connection); + + let chunks = client + .list_chunks(db_name) + .await + .map_err(Error::ListChunkError)?; + + let chunks = chunks + .into_iter() + .map(|c| ChunkSummary::try_from(c).map_err(Error::ConvertingResponse)) + .collect::>>()?; + + serde_json::to_writer_pretty(std::io::stdout(), &chunks).map_err(Error::WritingJson)?; + } + } + + Ok(()) +} diff --git a/src/influxdb_ioxd/rpc/error.rs b/src/influxdb_ioxd/rpc/error.rs index 8c80714a01..e6f71d8dc3 100644 --- a/src/influxdb_ioxd/rpc/error.rs +++ b/src/influxdb_ioxd/rpc/error.rs @@ -1,10 +1,10 @@ use generated_types::google::{InternalError, NotFound, PreconditionViolation}; use tracing::error; -use server::Error; +/// map common `server::Error` errors to the appropriate tonic Status +pub fn default_server_error_handler(error: server::Error) -> tonic::Status { + use server::Error; -/// map common server errors to the appropriate tonic Status -pub fn default_error_handler(error: Error) -> tonic::Status { match error { Error::IdNotSet => PreconditionViolation { category: "Writer ID".to_string(), @@ -24,3 +24,26 @@ pub fn default_error_handler(error: Error) -> tonic::Status { } } } + +/// map common `server::db::Error` errors to the appropriate tonic Status +pub fn default_db_error_handler(error: server::db::Error) -> tonic::Status { + use server::db::Error; + match error { + Error::DatabaseNotReadable {} => PreconditionViolation { + category: "database".to_string(), + subject: "influxdata.com/iox".to_string(), + description: "Cannot read from database: no mutable buffer configured".to_string(), + } + .into(), + Error::DatatbaseNotWriteable {} => PreconditionViolation { + category: "database".to_string(), + subject: "influxdata.com/iox".to_string(), + description: "Cannot write to database: no mutable buffer configured".to_string(), + } + .into(), + error => { + error!(?error, "Unexpected error"); + InternalError {}.into() + } + } +} diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index 926588f09c..570d57875a 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -6,7 +6,7 @@ use data_types::database_rules::DatabaseRules; use data_types::DatabaseName; use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt, NotFound}; use generated_types::influxdata::iox::management::v1::*; -use query::DatabaseStore; +use query::{Database, DatabaseStore}; use server::{ConnectionManager, Error, Server}; use tonic::{Request, Response, Status}; @@ -14,7 +14,7 @@ struct ManagementService { server: Arc>, } -use super::error::default_error_handler; +use super::error::{default_db_error_handler, default_server_error_handler}; #[tonic::async_trait] impl management_service_server::ManagementService for ManagementService @@ -92,10 +92,41 @@ where } .into()) } - Err(e) => Err(default_error_handler(e)), + Err(e) => Err(default_server_error_handler(e)), } } + async fn list_chunks( + &self, + request: Request, + ) -> Result, Status> { + let db_name = DatabaseName::new(request.into_inner().db_name).field("db_name")?; + + let db = match self.server.db(&db_name) { + Some(db) => db, + None => { + return Err(NotFound { + resource_type: "database".to_string(), + resource_name: db_name.to_string(), + ..Default::default() + } + .into()) + } + }; + + let chunk_summaries = match db.chunk_summaries() { + Ok(chunk_summaries) => chunk_summaries, + Err(e) => return Err(default_db_error_handler(e)), + }; + + let chunks: Vec = chunk_summaries + .into_iter() + .map(|summary| summary.into()) + .collect(); + + Ok(Response::new(ListChunksResponse { chunks })) + } + async fn list_remotes( &self, _: Request, diff --git a/src/influxdb_ioxd/rpc/write.rs b/src/influxdb_ioxd/rpc/write.rs index 2b3eafaea1..67e48fb438 100644 --- a/src/influxdb_ioxd/rpc/write.rs +++ b/src/influxdb_ioxd/rpc/write.rs @@ -7,7 +7,7 @@ use std::fmt::Debug; use tonic::Response; use tracing::debug; -use super::error::default_error_handler; +use super::error::default_server_error_handler; /// Implementation of the write service struct WriteService { @@ -25,7 +25,7 @@ where ) -> Result, tonic::Status> { let request = request.into_inner(); - let db_name = request.name; + let db_name = request.db_name; let lp_data = request.lp_data; let lp_chars = lp_data.len(); @@ -42,7 +42,7 @@ where self.server .write_lines(&db_name, &lines) .await - .map_err(default_error_handler)?; + .map_err(default_server_error_handler)?; let lines_written = lp_line_count as u64; Ok(Response::new(WriteResponse { lines_written })) diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index ae21281858..2bc096ace1 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -3,25 +3,17 @@ use std::num::NonZeroU32; use generated_types::google::protobuf::Empty; use generated_types::{google::protobuf::Duration, influxdata::iox::management::v1::*}; use influxdb_iox_client::management::{Client, CreateDatabaseError}; +use test_helpers::assert_contains; use crate::common::server_fixture::ServerFixture; -use super::util::rand_name; +use super::util::{create_readable_database, create_unreadable_database, rand_name}; #[tokio::test] -pub async fn test() { +async fn test_list_update_remotes() { let server_fixture = ServerFixture::create_single_use().await; let mut client = Client::new(server_fixture.grpc_channel()); - test_list_update_remotes(&mut client).await; - test_set_get_writer_id(&mut client).await; - test_create_database_duplicate_name(&mut client).await; - test_create_database_invalid_name(&mut client).await; - test_list_databases(&mut client).await; - test_create_get_database(&mut client).await; -} - -async fn test_list_update_remotes(client: &mut Client) { const TEST_REMOTE_ID_1: u32 = 42; const TEST_REMOTE_ADDR_1: &str = "1.2.3.4:1234"; const TEST_REMOTE_ID_2: u32 = 84; @@ -77,7 +69,11 @@ async fn test_list_update_remotes(client: &mut Client) { assert_eq!(res[0].connection_string, TEST_REMOTE_ADDR_2_UPDATED); } -async fn test_set_get_writer_id(client: &mut Client) { +#[tokio::test] +async fn test_set_get_writer_id() { + let server_fixture = ServerFixture::create_single_use().await; + let mut client = Client::new(server_fixture.grpc_channel()); + const TEST_ID: u32 = 42; client @@ -90,7 +86,11 @@ async fn test_set_get_writer_id(client: &mut Client) { assert_eq!(got.get(), TEST_ID); } -async fn test_create_database_duplicate_name(client: &mut Client) { +#[tokio::test] +async fn test_create_database_duplicate_name() { + let server_fixture = ServerFixture::create_shared().await; + let mut client = Client::new(server_fixture.grpc_channel()); + let db_name = rand_name(); client @@ -115,7 +115,11 @@ async fn test_create_database_duplicate_name(client: &mut Client) { )) } -async fn test_create_database_invalid_name(client: &mut Client) { +#[tokio::test] +async fn test_create_database_invalid_name() { + let server_fixture = ServerFixture::create_shared().await; + let mut client = Client::new(server_fixture.grpc_channel()); + let err = client .create_database(DatabaseRules { name: "my_example\ndb".to_string(), @@ -127,7 +131,11 @@ async fn test_create_database_invalid_name(client: &mut Client) { assert!(matches!(dbg!(err), CreateDatabaseError::InvalidArgument(_))); } -async fn test_list_databases(client: &mut Client) { +#[tokio::test] +async fn test_list_databases() { + let server_fixture = ServerFixture::create_shared().await; + let mut client = Client::new(server_fixture.grpc_channel()); + let name = rand_name(); client .create_database(DatabaseRules { @@ -144,7 +152,11 @@ async fn test_list_databases(client: &mut Client) { assert!(names.contains(&name)); } -async fn test_create_get_database(client: &mut Client) { +#[tokio::test] +async fn test_create_get_database() { + let server_fixture = ServerFixture::create_shared().await; + let mut client = Client::new(server_fixture.grpc_channel()); + let db_name = rand_name(); // Specify everything to allow direct comparison between request and response @@ -191,3 +203,87 @@ async fn test_create_get_database(client: &mut Client) { assert_eq!(response, rules); } + +#[tokio::test] +async fn test_chunk_get() { + use generated_types::influxdata::iox::management::v1::{Chunk, ChunkStorage}; + + let fixture = ServerFixture::create_shared().await; + let mut management_client = Client::new(fixture.grpc_channel()); + let mut write_client = influxdb_iox_client::write::Client::new(fixture.grpc_channel()); + + let db_name = rand_name(); + create_readable_database(&db_name, fixture.grpc_channel()).await; + + let lp_lines = vec![ + "cpu,region=west user=23.2 100", + "cpu,region=west user=21.0 150", + "disk,region=east bytes=99i 200", + ]; + + write_client + .write(&db_name, lp_lines.join("\n")) + .await + .expect("write succeded"); + + let mut chunks = management_client + .list_chunks(&db_name) + .await + .expect("listing chunks"); + + // ensure the output order is consistent + chunks.sort_by(|c1, c2| c1.partition_key.cmp(&c2.partition_key)); + + let expected: Vec = vec![ + Chunk { + partition_key: "cpu".into(), + id: 0, + storage: ChunkStorage::OpenMutableBuffer as i32, + estimated_bytes: 145, + }, + Chunk { + partition_key: "disk".into(), + id: 0, + storage: ChunkStorage::OpenMutableBuffer as i32, + estimated_bytes: 107, + }, + ]; + assert_eq!( + expected, chunks, + "expected:\n\n{:#?}\n\nactual:{:#?}", + expected, chunks + ); +} + +#[tokio::test] +async fn test_chunk_get_errors() { + let fixture = ServerFixture::create_shared().await; + let mut management_client = Client::new(fixture.grpc_channel()); + let db_name = rand_name(); + + let err = management_client + .list_chunks(&db_name) + .await + .expect_err("no db had been created"); + + assert_contains!( + err.to_string(), + "Some requested entity was not found: Resource database" + ); + + create_unreadable_database(&db_name, fixture.grpc_channel()).await; + + let err = management_client + .list_chunks(&db_name) + .await + .expect_err("db can't be read"); + + assert_contains!( + err.to_string(), + "Precondition violation influxdata.com/iox - database" + ); + assert_contains!( + err.to_string(), + "Cannot read from database: no mutable buffer configured" + ); +} diff --git a/tests/end_to_end_cases/management_cli.rs b/tests/end_to_end_cases/management_cli.rs index 81532fbf3b..8ef8688682 100644 --- a/tests/end_to_end_cases/management_cli.rs +++ b/tests/end_to_end_cases/management_cli.rs @@ -1,19 +1,15 @@ use assert_cmd::Command; use predicates::prelude::*; +use test_helpers::make_temp_file; use crate::common::server_fixture::ServerFixture; +use super::util::{create_readable_database, rand_name}; + #[tokio::test] -pub async fn test() { - let server_fixture = ServerFixture::create_single_use().await; +async fn test_writer_id() { + let server_fixture = ServerFixture::create_shared().await; let addr = server_fixture.grpc_base(); - - test_writer_id(addr).await; - test_create_database(addr).await; - test_remotes(addr).await; -} - -async fn test_writer_id(addr: &str) { Command::cargo_bin("influxdb_iox") .unwrap() .arg("writer") @@ -36,8 +32,12 @@ async fn test_writer_id(addr: &str) { .stdout(predicate::str::contains("32")); } -async fn test_create_database(addr: &str) { - let db = "management-cli-test"; +#[tokio::test] +async fn test_create_database() { + let server_fixture = ServerFixture::create_shared().await; + let addr = server_fixture.grpc_base(); + let db_name = rand_name(); + let db = &db_name; Command::cargo_bin("influxdb_iox") .unwrap() @@ -83,7 +83,84 @@ async fn test_create_database(addr: &str) { .stdout(predicate::str::contains(format!("name: \"{}\"", db))); } -async fn test_remotes(addr: &str) { +#[tokio::test] +async fn test_get_chunks() { + let server_fixture = ServerFixture::create_shared().await; + let addr = server_fixture.grpc_base(); + let db_name = rand_name(); + + create_readable_database(&db_name, server_fixture.grpc_channel()).await; + + let lp_data = vec![ + "cpu,region=west user=23.2 100", + "cpu,region=west user=21.0 150", + ]; + + let lp_data_file = make_temp_file(lp_data.join("\n")); + + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("database") + .arg("write") + .arg(&db_name) + .arg(lp_data_file.as_ref()) + .arg("--host") + .arg(addr) + .assert() + .success() + .stdout(predicate::str::contains("2 Lines OK")); + + let expected = r#"[ + { + "partition_key": "cpu", + "id": 0, + "storage": "OpenMutableBuffer", + "estimated_bytes": 145 + } +]"#; + + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("database") + .arg("chunk") + .arg("list") + .arg(&db_name) + .arg("--host") + .arg(addr) + .assert() + .success() + .stdout(predicate::str::contains(expected)); +} + +#[tokio::test] +async fn test_list_chunks_error() { + let server_fixture = ServerFixture::create_shared().await; + let addr = server_fixture.grpc_base(); + let db_name = rand_name(); + + // note don't make the database, expect error + + // list the chunks + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("database") + .arg("chunk") + .arg("list") + .arg(&db_name) + .arg("--host") + .arg(addr) + .assert() + .failure() + .stderr( + predicate::str::contains("Some requested entity was not found: Resource database") + .and(predicate::str::contains(&db_name)), + ); +} + +#[tokio::test] +async fn test_remotes() { + let server_fixture = ServerFixture::create_single_use().await; + let addr = server_fixture.grpc_base(); Command::cargo_bin("influxdb_iox") .unwrap() .arg("server") diff --git a/tests/end_to_end_cases/util.rs b/tests/end_to_end_cases/util.rs index f70c1cd241..a259fc9081 100644 --- a/tests/end_to_end_cases/util.rs +++ b/tests/end_to_end_cases/util.rs @@ -1,5 +1,8 @@ use rand::{distributions::Alphanumeric, thread_rng, Rng}; +use generated_types::google::protobuf::Empty; +use generated_types::influxdata::iox::management::v1::*; + /// Return a random string suitable for use as a database name pub fn rand_name() -> String { thread_rng() @@ -8,3 +11,51 @@ pub fn rand_name() -> String { .map(char::from) .collect() } + +/// given a channel to talk with the managment api, create a new +/// database with the specified name configured with a 10MB mutable +/// buffer, partitioned on table +pub async fn create_readable_database( + db_name: impl Into, + channel: tonic::transport::Channel, +) { + let mut management_client = influxdb_iox_client::management::Client::new(channel); + + let rules = DatabaseRules { + name: db_name.into(), + partition_template: Some(PartitionTemplate { + parts: vec![partition_template::Part { + part: Some(partition_template::part::Part::Table(Empty {})), + }], + }), + mutable_buffer_config: Some(MutableBufferConfig { + buffer_size: 10 * 1024 * 1024, + ..Default::default() + }), + ..Default::default() + }; + + management_client + .create_database(rules.clone()) + .await + .expect("create database failed"); +} + +/// given a channel to talk with the managment api, create a new +/// database with no mutable buffer configured, no partitioning rules +pub async fn create_unreadable_database( + db_name: impl Into, + channel: tonic::transport::Channel, +) { + let mut management_client = influxdb_iox_client::management::Client::new(channel); + + let rules = DatabaseRules { + name: db_name.into(), + ..Default::default() + }; + + management_client + .create_database(rules.clone()) + .await + .expect("create database failed"); +} diff --git a/tests/end_to_end_cases/write_api.rs b/tests/end_to_end_cases/write_api.rs index 9785e9e722..a1a67aa283 100644 --- a/tests/end_to_end_cases/write_api.rs +++ b/tests/end_to_end_cases/write_api.rs @@ -1,26 +1,17 @@ -use influxdb_iox_client::management::{self, generated_types::DatabaseRules}; use influxdb_iox_client::write::{self, WriteError}; use test_helpers::assert_contains; -use super::util::rand_name; - use crate::common::server_fixture::ServerFixture; +use super::util::{create_readable_database, rand_name}; + #[tokio::test] async fn test_write() { let fixture = ServerFixture::create_shared().await; - let mut management_client = management::Client::new(fixture.grpc_channel()); let mut write_client = write::Client::new(fixture.grpc_channel()); let db_name = rand_name(); - - management_client - .create_database(DatabaseRules { - name: db_name.clone(), - ..Default::default() - }) - .await - .expect("create database failed"); + create_readable_database(&db_name, fixture.grpc_channel()).await; let lp_lines = vec![ "cpu,region=west user=23.2 100", From 2456bc1b28daa0042187c88c101e8f7917360bfd Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 12 Mar 2021 09:30:45 -0500 Subject: [PATCH 3/9] refactor: impl Error for FieldViolation (#975) --- generated_types/src/google.rs | 14 +++++++++++++- src/commands/database/chunk.rs | 8 ++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/generated_types/src/google.rs b/generated_types/src/google.rs index 51339d87bc..75b204397e 100644 --- a/generated_types/src/google.rs +++ b/generated_types/src/google.rs @@ -84,6 +84,18 @@ impl FieldViolation { } } +impl std::error::Error for FieldViolation {} + +impl std::fmt::Display for FieldViolation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Violation for field \"{}\": {}", + self.field, self.description + ) + } +} + fn encode_bad_request(violation: Vec) -> Result { let mut buffer = BytesMut::new(); @@ -106,7 +118,7 @@ fn encode_bad_request(violation: Vec) -> Result for tonic::Status { fn from(f: FieldViolation) -> Self { - let message = format!("Violation for field \"{}\": {}", f.field, f.description); + let message = f.to_string(); match encode_bad_request(vec![f]) { Ok(details) => encode_status(tonic::Code::InvalidArgument, message, details), diff --git a/src/commands/database/chunk.rs b/src/commands/database/chunk.rs index 6f8c3a3806..b5ab4cf70c 100644 --- a/src/commands/database/chunk.rs +++ b/src/commands/database/chunk.rs @@ -14,8 +14,8 @@ pub enum Error { #[error("Error listing chunks: {0}")] ListChunkError(#[from] ListChunksError), - #[error("Error interpreting server response: {:?}", .0)] - ConvertingResponse(FieldViolation), + #[error("Error interpreting server response: {0}")] + ConvertingResponse(#[from] FieldViolation), #[error("Error rendering response as JSON: {0}")] WritingJson(#[from] serde_json::Error), @@ -62,8 +62,8 @@ pub async fn command(url: String, config: Config) -> Result<()> { let chunks = chunks .into_iter() - .map(|c| ChunkSummary::try_from(c).map_err(Error::ConvertingResponse)) - .collect::>>()?; + .map(ChunkSummary::try_from) + .collect::, FieldViolation>>()?; serde_json::to_writer_pretty(std::io::stdout(), &chunks).map_err(Error::WritingJson)?; } From 6fecf68bd40adba2d1aed112a8fd0ab81e52f844 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 12 Mar 2021 09:43:49 -0500 Subject: [PATCH 4/9] feat: make CLI to listing database consistent with other commands (#974) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- src/commands/database.rs | 27 ++++++++++++++---------- tests/end_to_end_cases/management_cli.rs | 2 +- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/src/commands/database.rs b/src/commands/database.rs index 74b2a12df1..208cc4e879 100644 --- a/src/commands/database.rs +++ b/src/commands/database.rs @@ -68,11 +68,15 @@ struct Create { mutable_buffer: Option, } -/// Get list of databases, or return configuration of specific database +/// Get list of databases +#[derive(Debug, StructOpt)] +struct List {} + +/// Return configuration of specific database #[derive(Debug, StructOpt)] struct Get { - /// If specified returns configuration of database - name: Option, + /// The name of the database + name: String, } /// Write data into the specified database @@ -103,6 +107,7 @@ struct Query { #[derive(Debug, StructOpt)] enum Command { Create(Create), + List(List), Get(Get), Write(Write), Query(Query), @@ -129,16 +134,16 @@ pub async fn command(url: String, config: Config) -> Result<()> { .await?; println!("Ok"); } + Command::List(_) => { + let mut client = management::Client::new(connection); + let databases = client.list_databases().await?; + println!("{}", databases.join(", ")) + } Command::Get(get) => { let mut client = management::Client::new(connection); - if let Some(name) = get.name { - let database = client.get_database(name).await?; - // TOOD: Do something better than this - println!("{:#?}", database); - } else { - let databases = client.list_databases().await?; - println!("{}", databases.join(", ")) - } + let database = client.get_database(get.name).await?; + // TOOD: Do something better than this + println!("{:#?}", database); } Command::Write(write) => { let mut client = write::Client::new(connection); diff --git a/tests/end_to_end_cases/management_cli.rs b/tests/end_to_end_cases/management_cli.rs index 8ef8688682..2d1ba2597f 100644 --- a/tests/end_to_end_cases/management_cli.rs +++ b/tests/end_to_end_cases/management_cli.rs @@ -64,7 +64,7 @@ async fn test_create_database() { Command::cargo_bin("influxdb_iox") .unwrap() .arg("database") - .arg("get") + .arg("list") .arg("--host") .arg(addr) .assert() From 7e25c4e896df07eebde3e308d7dc0bac65bc411b Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Fri, 12 Mar 2021 15:01:27 +0000 Subject: [PATCH 5/9] feat: add fanout task tracking (#956) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 2 + data_types/src/job.rs | 24 + data_types/src/lib.rs | 4 +- generated_types/build.rs | 1 + .../influxdata/iox/management/v1/jobs.proto | 23 + .../iox/management/v1/service.proto | 18 +- server/Cargo.toml | 2 + server/src/buffer.rs | 17 +- server/src/lib.rs | 128 ++-- server/src/tracker.rs | 637 ++++++++++++++---- server/src/tracker/future.rs | 87 +++ server/src/tracker/registry.rs | 126 ++++ src/influxdb_ioxd.rs | 8 +- src/influxdb_ioxd/rpc.rs | 4 +- src/influxdb_ioxd/rpc/management.rs | 10 + src/influxdb_ioxd/rpc/operations.rs | 182 +++++ 16 files changed, 1070 insertions(+), 203 deletions(-) create mode 100644 data_types/src/job.rs create mode 100644 generated_types/protos/influxdata/iox/management/v1/jobs.proto create mode 100644 server/src/tracker/future.rs create mode 100644 server/src/tracker/registry.rs create mode 100644 src/influxdb_ioxd/rpc/operations.rs diff --git a/Cargo.lock b/Cargo.lock index e1101ea91b..323119d758 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3195,6 +3195,7 @@ dependencies = [ "flatbuffers 0.6.1", "futures", "generated_types", + "hashbrown", "influxdb_line_protocol", "mutable_buffer", "object_store", @@ -3208,6 +3209,7 @@ dependencies = [ "snap", "test_helpers", "tokio", + "tokio-util", "tracing", "uuid", ] diff --git a/data_types/src/job.rs b/data_types/src/job.rs new file mode 100644 index 0000000000..738b39a938 --- /dev/null +++ b/data_types/src/job.rs @@ -0,0 +1,24 @@ +use generated_types::influxdata::iox::management::v1 as management; + +/// Metadata associated with a set of background tasks +/// Used in combination with TrackerRegistry +#[derive(Debug, Clone)] +pub enum Job { + PersistSegment { writer_id: u32, segment_id: u64 }, + Dummy { nanos: Vec }, +} + +impl From for management::operation_metadata::Job { + fn from(job: Job) -> Self { + match job { + Job::PersistSegment { + writer_id, + segment_id, + } => Self::PersistSegment(management::PersistSegment { + writer_id, + segment_id, + }), + Job::Dummy { nanos } => Self::Dummy(management::Dummy { nanos }), + } + } +} diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index c7edc30360..574feb3c27 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -10,6 +10,7 @@ clippy::clone_on_ref_ptr )] +pub use database_name::*; pub use schema::TIME_COLUMN_NAME; /// The name of the column containing table names returned by a call to @@ -25,6 +26,7 @@ pub mod data; pub mod database_rules; pub mod error; pub mod http; +pub mod job; pub mod names; pub mod partition_metadata; pub mod schema; @@ -33,6 +35,4 @@ pub mod timestamp; pub mod wal; mod database_name; -pub use database_name::*; - pub(crate) mod field_validation; diff --git a/generated_types/build.rs b/generated_types/build.rs index 7e66a0f15f..af953aa288 100644 --- a/generated_types/build.rs +++ b/generated_types/build.rs @@ -41,6 +41,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> { management_path.join("database_rules.proto"), management_path.join("chunk.proto"), management_path.join("service.proto"), + management_path.join("jobs.proto"), write_path.join("service.proto"), root.join("grpc/health/v1/service.proto"), root.join("google/longrunning/operations.proto"), diff --git a/generated_types/protos/influxdata/iox/management/v1/jobs.proto b/generated_types/protos/influxdata/iox/management/v1/jobs.proto new file mode 100644 index 0000000000..01691dbfad --- /dev/null +++ b/generated_types/protos/influxdata/iox/management/v1/jobs.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; +package influxdata.iox.management.v1; + +message OperationMetadata { + uint64 cpu_nanos = 1; + uint64 wall_nanos = 2; + uint64 task_count = 3; + uint64 pending_count = 4; + + oneof job { + Dummy dummy = 5; + PersistSegment persist_segment = 6; + } +} + +message PersistSegment { + uint32 writer_id = 1; + uint64 segment_id = 2; +} + +message Dummy { + repeated uint64 nanos = 1; +} diff --git a/generated_types/protos/influxdata/iox/management/v1/service.proto b/generated_types/protos/influxdata/iox/management/v1/service.proto index e09f9d652f..1a36a8e75a 100644 --- a/generated_types/protos/influxdata/iox/management/v1/service.proto +++ b/generated_types/protos/influxdata/iox/management/v1/service.proto @@ -1,7 +1,7 @@ syntax = "proto3"; package influxdata.iox.management.v1; -import "google/protobuf/empty.proto"; +import "google/longrunning/operations.proto"; import "influxdata/iox/management/v1/database_rules.proto"; import "influxdata/iox/management/v1/chunk.proto"; @@ -27,6 +27,15 @@ service ManagementService { // Delete a reference to remote IOx server. rpc DeleteRemote(DeleteRemoteRequest) returns (DeleteRemoteResponse); + + // Creates a dummy job that for each value of the nanos field + // spawns a task that sleeps for that number of nanoseconds before returning + rpc CreateDummyJob(CreateDummyJobRequest) returns (CreateDummyJobResponse) { + option (google.longrunning.operation_info) = { + response_type: "google.protobuf.Empty" + metadata_type: "OperationMetadata" + }; + } } message GetWriterIdRequest {} @@ -70,6 +79,13 @@ message ListChunksResponse { repeated Chunk chunks = 1; } +message CreateDummyJobRequest { + repeated uint64 nanos = 1; +} + +message CreateDummyJobResponse { + google.longrunning.Operation operation = 1; +} message ListRemotesRequest {} diff --git a/server/Cargo.toml b/server/Cargo.toml index a82d9bd3e6..a68cc4d1fb 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -14,6 +14,7 @@ data_types = { path = "../data_types" } flatbuffers = "0.6" futures = "0.3.7" generated_types = { path = "../generated_types" } +hashbrown = "0.9.1" influxdb_line_protocol = { path = "../influxdb_line_protocol" } mutable_buffer = { path = "../mutable_buffer" } object_store = { path = "../object_store" } @@ -26,6 +27,7 @@ serde_json = "1.0" snafu = "0.6" snap = "1.0.0" tokio = { version = "1.0", features = ["macros", "time"] } +tokio-util = { version = "0.6.3" } tracing = "0.1" uuid = { version = "0.8", features = ["serde", "v4"] } diff --git a/server/src/buffer.rs b/server/src/buffer.rs index 550719b7b4..7046cc1642 100644 --- a/server/src/buffer.rs +++ b/server/src/buffer.rs @@ -15,7 +15,7 @@ use std::{ sync::Arc, }; -use crate::tracker::{TrackedFutureExt, TrackerRegistry}; +use crate::tracker::{TrackedFutureExt, TrackerRegistration}; use bytes::Bytes; use chrono::{DateTime, Utc}; use crc32fast::Hasher; @@ -73,12 +73,6 @@ pub enum Error { InvalidFlatbuffersSegment, } -#[derive(Debug, Clone)] -pub struct SegmentPersistenceTask { - writer_id: u32, - location: object_store::path::Path, -} - pub type Result = std::result::Result; /// An in-memory buffer of a write ahead log. It is split up into segments, @@ -376,7 +370,7 @@ impl Segment { /// the given object store location. pub fn persist_bytes_in_background( &self, - reg: &TrackerRegistry, + tracker: TrackerRegistration, writer_id: u32, db_name: &DatabaseName<'_>, store: Arc, @@ -385,11 +379,6 @@ impl Segment { let location = database_object_store_path(writer_id, db_name, &store); let location = object_store_path_for_segment(&location, self.id)?; - let task_meta = SegmentPersistenceTask { - writer_id, - location: location.clone(), - }; - let len = data.len(); let mut stream_data = std::io::Result::Ok(data.clone()); @@ -414,7 +403,7 @@ impl Segment { // TODO: Mark segment as persisted info!("persisted data to {}", location.display()); } - .track(reg, task_meta), + .track(tracker), ); Ok(()) diff --git a/server/src/lib.rs b/server/src/lib.rs index 495e28ebbf..8b3c7969f0 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -67,43 +67,44 @@ clippy::clone_on_ref_ptr )] -pub mod buffer; -mod config; -pub mod db; -pub mod snapshot; -mod tracker; - -#[cfg(test)] -mod query_tests; - use std::sync::{ atomic::{AtomicU32, Ordering}, Arc, }; -use crate::{ - buffer::SegmentPersistenceTask, - config::{ - object_store_path_for_database_config, Config, GRPCConnectionString, DB_RULES_FILE_NAME, - }, - db::Db, - tracker::TrackerRegistry, -}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::TryStreamExt; +use snafu::{OptionExt, ResultExt, Snafu}; +use tracing::{error, info}; + use data_types::{ data::{lines_to_replicated_write, ReplicatedWrite}, - database_rules::DatabaseRules, + database_rules::{DatabaseRules, WriterId}, + job::Job, {DatabaseName, DatabaseNameError}, }; use influxdb_line_protocol::ParsedLine; use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi}; use query::{exec::Executor, DatabaseStore}; -use async_trait::async_trait; -use bytes::Bytes; -use data_types::database_rules::WriterId; -use futures::stream::TryStreamExt; -use snafu::{OptionExt, ResultExt, Snafu}; -use tracing::error; +use crate::tracker::TrackedFutureExt; +use crate::{ + config::{ + object_store_path_for_database_config, Config, GRPCConnectionString, DB_RULES_FILE_NAME, + }, + db::Db, + tracker::{Tracker, TrackerId, TrackerRegistry}, +}; + +pub mod buffer; +mod config; +pub mod db; +pub mod snapshot; +pub mod tracker; + +#[cfg(test)] +mod query_tests; type DatabaseError = Box; @@ -157,7 +158,7 @@ pub struct Server { connection_manager: Arc, pub store: Arc, executor: Arc, - segment_persistence_registry: TrackerRegistry, + jobs: TrackerRegistry, } impl Server { @@ -168,7 +169,7 @@ impl Server { store, connection_manager: Arc::new(connection_manager), executor: Arc::new(Executor::new()), - segment_persistence_registry: TrackerRegistry::new(), + jobs: TrackerRegistry::new(), } } @@ -348,13 +349,14 @@ impl Server { if persist { let writer_id = self.require_id()?; let store = Arc::clone(&self.store); + + let (_, tracker) = self.jobs.register(Job::PersistSegment { + writer_id, + segment_id: segment.id, + }); + segment - .persist_bytes_in_background( - &self.segment_persistence_registry, - writer_id, - db_name, - store, - ) + .persist_bytes_in_background(tracker, writer_id, db_name, store) .context(WalError)?; } } @@ -382,6 +384,50 @@ impl Server { pub fn delete_remote(&self, id: WriterId) -> Option { self.config.delete_remote(id) } + + pub fn spawn_dummy_job(&self, nanos: Vec) -> Tracker { + let (tracker, registration) = self.jobs.register(Job::Dummy { + nanos: nanos.clone(), + }); + + for duration in nanos { + tokio::spawn( + tokio::time::sleep(tokio::time::Duration::from_nanos(duration)) + .track(registration.clone()), + ); + } + + tracker + } + + /// Returns a list of all jobs tracked by this server + pub fn tracked_jobs(&self) -> Vec> { + self.jobs.tracked() + } + + /// Returns a specific job tracked by this server + pub fn get_job(&self, id: TrackerId) -> Option> { + self.jobs.get(id) + } + + /// Background worker function + /// + /// TOOD: Handle termination (#827) + pub async fn background_worker(&self) { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); + + loop { + // TODO: Retain limited history of past jobs, e.g. enqueue returned data into a + // Dequeue + let reclaimed = self.jobs.reclaim(); + + for job in reclaimed { + info!(?job, "job finished"); + } + + interval.tick().await; + } + } } #[async_trait] @@ -508,20 +554,24 @@ async fn get_store_bytes( #[cfg(test)] mod tests { - use super::*; - use crate::buffer::Segment; - use arrow_deps::{assert_table_eq, datafusion::physical_plan::collect}; + use std::collections::BTreeMap; + use async_trait::async_trait; + use futures::TryStreamExt; + use parking_lot::Mutex; + use snafu::Snafu; + + use arrow_deps::{assert_table_eq, datafusion::physical_plan::collect}; use data_types::database_rules::{ PartitionTemplate, TemplatePart, WalBufferConfig, WalBufferRollover, }; - use futures::TryStreamExt; use influxdb_line_protocol::parse_lines; use object_store::{memory::InMemory, path::ObjectStorePath}; - use parking_lot::Mutex; use query::frontend::sql::SQLQueryPlanner; - use snafu::Snafu; - use std::collections::BTreeMap; + + use crate::buffer::Segment; + + use super::*; type TestError = Box; type Result = std::result::Result; diff --git a/server/src/tracker.rs b/server/src/tracker.rs index 07e0d0d8db..53493a91ba 100644 --- a/server/src/tracker.rs +++ b/server/src/tracker.rs @@ -1,241 +1,592 @@ -use std::collections::HashMap; -use std::future::Future; -use std::pin::Pin; +//! This module contains a future tracking system supporting fanout, +//! cancellation and asynchronous signalling of completion +//! +//! A Tracker is created by calling TrackerRegistry::register. TrackedFutures +//! can then be associated with this Tracker and monitored and/or cancelled. +//! +//! This is used within IOx to track futures spawned as multiple tokio tasks. +//! +//! For example, when migrating a chunk from the mutable buffer to the read +//! buffer: +//! +//! - There is a single over-arching Job being performed +//! - A single tracker is allocated from a TrackerRegistry in Server and +//! associated with the Job metadata +//! - This tracker is registered with every future that is spawned as a tokio +//! task +//! +//! This same system may in future form part of a query tracking story +//! +//! # Correctness +//! +//! The key correctness property of the Tracker system is Tracker::is_complete +//! only returns true when all futures associated with the tracker have +//! completed and no more can be spawned. Additionally at such a point +//! all metrics - cpu_nanos, wall_nanos, created_futures should be visible +//! to the calling thread +//! +//! Note: there is no guarantee that pending_registrations or pending_futures +//! ever reaches 0, a program could call mem::forget on a TrackerRegistration, +//! leak the TrackerRegistration, spawn a future that never finishes, etc... +//! Such a program would never consider the Tracker complete and therefore this +//! doesn't violate the correctness property +//! +//! ## Proof +//! +//! 1. pending_registrations starts at 1, and is incremented on +//! TrackerRegistration::clone. A TrackerRegistration cannot be created from an +//! existing TrackerState, only another TrackerRegistration +//! +//! 2. pending_registrations is decremented with release semantics on +//! TrackerRegistration::drop +//! +//! 3. pending_futures is only incremented with a TrackerRegistration in scope +//! +//! 4. 2. + 3. -> A thread that increments pending_futures, decrements +//! pending_registrations with release semantics afterwards. By definition of +//! release semantics these writes to pending_futures cannot be reordered to +//! come after the atomic decrement of pending_registrations +//! +//! 5. 1. + 2. + drop cannot be called multiple times on the same object -> once +//! pending_registrations is decremented to 0 it can never be incremented again +//! +//! 6. 4. + 5. -> the decrement to 0 of pending_registrations must commit after +//! the last increment of pending_futures +//! +//! 7. pending_registrations is loaded with acquire semantics +//! +//! 8. By definition of acquire semantics, any thread that reads +//! pending_registrations is guaranteed to see any increments to pending_futures +//! performed before the most recent decrement of pending_registrations +//! +//! 9. 6. + 8. -> A thread that observes a pending_registrations of 0 cannot +//! subsequently observe pending_futures to increase +//! +//! 10. Tracker::is_complete returns if it observes pending_registrations to be +//! 0 and then pending_futures to be 0 +//! +//! 11. 9 + 10 -> A thread can only observe Tracker::is_complete() == true +//! after all futures have been dropped and no more can be created +//! +//! 12. pending_futures is decremented with Release semantics on +//! TrackedFuture::drop after any associated metrics have been incremented +//! +//! 13. pending_futures is loaded with acquire semantics +//! +//! 14. 12. + 13. -> A thread that observes a pending_futures of 0 is guaranteed +//! to see any metrics from any dropped TrackedFuture +//! +//! Note: this proof ignores the complexity of moving Trackers, TrackedFutures, +//! etc... between threads as any such functionality must perform the necessary +//! synchronisation to be well-formed. + use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::time::Instant; -use futures::prelude::*; -use parking_lot::Mutex; -use pin_project::{pin_project, pinned_drop}; +use tokio_util::sync::CancellationToken; +use tracing::warn; -/// Every future registered with a `TrackerRegistry` is assigned a unique -/// `TrackerId` -#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct TrackerId(usize); +pub use future::{TrackedFuture, TrackedFutureExt}; +pub use registry::{TrackerId, TrackerRegistry}; +mod future; +mod registry; + +/// The state shared between all sibling tasks #[derive(Debug)] -struct Tracker { - data: T, - abort: future::AbortHandle, +struct TrackerState { + start_instant: Instant, + cancel_token: CancellationToken, + cpu_nanos: AtomicUsize, + wall_nanos: AtomicUsize, + + created_futures: AtomicUsize, + pending_futures: AtomicUsize, + pending_registrations: AtomicUsize, + + watch: tokio::sync::watch::Receiver, } +/// A Tracker can be used to monitor/cancel/wait for a set of associated futures #[derive(Debug)] -struct TrackerContextInner { - id: AtomicUsize, - trackers: Mutex>>, +pub struct Tracker { + id: TrackerId, + state: Arc, + metadata: Arc, } -/// Allows tracking the lifecycle of futures registered by -/// `TrackedFutureExt::track` with an accompanying metadata payload of type T -/// -/// Additionally can trigger graceful termination of registered futures -#[derive(Debug)] -pub struct TrackerRegistry { - inner: Arc>, -} - -// Manual Clone to workaround https://github.com/rust-lang/rust/issues/26925 -impl Clone for TrackerRegistry { +impl Clone for Tracker { fn clone(&self) -> Self { Self { - inner: Arc::clone(&self.inner), + id: self.id, + state: Arc::clone(&self.state), + metadata: Arc::clone(&self.metadata), } } } -impl Default for TrackerRegistry { - fn default() -> Self { - Self { - inner: Arc::new(TrackerContextInner { - id: AtomicUsize::new(0), - trackers: Mutex::new(Default::default()), - }), - } - } -} - -impl TrackerRegistry { - pub fn new() -> Self { - Default::default() +impl Tracker { + /// Returns the ID of the Tracker - these are unique per TrackerRegistry + pub fn id(&self) -> TrackerId { + self.id } - /// Trigger graceful termination of a registered future - /// - /// Returns false if no future found with the provided ID + /// Returns a reference to the metadata stored within this Tracker + pub fn metadata(&self) -> &T { + &self.metadata + } + + /// Trigger graceful termination of any futures tracked by + /// this tracker /// /// Note: If the future is currently executing, termination /// will only occur when the future yields (returns from poll) - #[allow(dead_code)] - pub fn terminate(&self, id: TrackerId) -> bool { - if let Some(meta) = self.inner.trackers.lock().get_mut(&id) { - meta.abort.abort(); - true - } else { - false + /// and is then scheduled to run again + pub fn cancel(&self) { + self.state.cancel_token.cancel(); + } + + /// Returns the number of outstanding futures + pub fn pending_futures(&self) -> usize { + self.state.pending_futures.load(Ordering::Relaxed) + } + + /// Returns the number of TrackedFutures created with this Tracker + pub fn created_futures(&self) -> usize { + self.state.created_futures.load(Ordering::Relaxed) + } + + /// Returns the number of nanoseconds futures tracked by this + /// tracker have spent executing + pub fn cpu_nanos(&self) -> usize { + self.state.cpu_nanos.load(Ordering::Relaxed) + } + + /// Returns the number of nanoseconds since the Tracker was registered + /// to the time the last TrackedFuture was dropped + /// + /// Returns 0 if there are still pending tasks + pub fn wall_nanos(&self) -> usize { + if !self.is_complete() { + return 0; } + self.state.wall_nanos.load(Ordering::Relaxed) } - fn untrack(&self, id: &TrackerId) { - self.inner.trackers.lock().remove(id); + /// Returns true if all futures associated with this tracker have + /// been dropped and no more can be created + pub fn is_complete(&self) -> bool { + // The atomic decrement in TrackerRegistration::drop has release semantics + // acquire here ensures that if a thread observes the tracker to have + // no pending_registrations it cannot subsequently observe pending_futures + // to increase. If it could, observing pending_futures==0 would be insufficient + // to conclude there are no outstanding futures + let pending_registrations = self.state.pending_registrations.load(Ordering::Acquire); + + // The atomic decrement in TrackedFuture::drop has release semantics + // acquire therefore ensures that if a thread observes the completion of + // a TrackedFuture, it is guaranteed to see its updates (e.g. wall_nanos) + let pending_futures = self.state.pending_futures.load(Ordering::Acquire); + + pending_registrations == 0 && pending_futures == 0 } - fn track(&self, metadata: T) -> (TrackerId, future::AbortRegistration) { - let id = TrackerId(self.inner.id.fetch_add(1, Ordering::Relaxed)); - let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); - - self.inner.trackers.lock().insert( - id, - Tracker { - abort: abort_handle, - data: metadata, - }, - ); - - (id, abort_registration) + /// Returns if this tracker has been cancelled + pub fn is_cancelled(&self) -> bool { + self.state.cancel_token.is_cancelled() } -} -impl TrackerRegistry { - /// Returns a list of tracked futures, with their accompanying IDs and - /// metadata - #[allow(dead_code)] - pub fn tracked(&self) -> Vec<(TrackerId, T)> { - // TODO: Improve this - (#711) - self.inner - .trackers - .lock() - .iter() - .map(|(id, value)| (*id, value.data.clone())) - .collect() - } -} + /// Blocks until all futures associated with the tracker have been + /// dropped and no more can be created + pub async fn join(&self) { + let mut watch = self.state.watch.clone(); -/// An extension trait that provides `self.track(reg, {})` allowing -/// registering this future with a `TrackerRegistry` -pub trait TrackedFutureExt: Future { - fn track(self, reg: &TrackerRegistry, metadata: T) -> TrackedFuture - where - Self: Sized, - { - let (id, registration) = reg.track(metadata); - - TrackedFuture { - inner: future::Abortable::new(self, registration), - reg: reg.clone(), - id, + // Wait until watch is set to true or the tx side is dropped + while !*watch.borrow() { + if watch.changed().await.is_err() { + // tx side has been dropped + warn!("tracker watch dropped without signalling"); + break; + } } } } -impl TrackedFutureExt for T where T: Future {} - -/// The `Future` returned by `TrackedFutureExt::track()` -/// Unregisters the future from the registered `TrackerRegistry` on drop -/// and provides the early termination functionality used by -/// `TrackerRegistry::terminate` -#[pin_project(PinnedDrop)] -pub struct TrackedFuture { - #[pin] - inner: future::Abortable, - - reg: TrackerRegistry, - id: TrackerId, +/// A TrackerRegistration is returned by TrackerRegistry::register and can be +/// used to register new TrackedFutures +/// +/// A tracker will not be considered completed until all TrackerRegistrations +/// referencing it have been dropped. This is to prevent a race where further +/// TrackedFutures are registered with a Tracker that has already signalled +/// completion +#[derive(Debug)] +pub struct TrackerRegistration { + state: Arc, } -impl Future for TrackedFuture { - type Output = Result; +impl Clone for TrackerRegistration { + fn clone(&self) -> Self { + self.state + .pending_registrations + .fetch_add(1, Ordering::Relaxed); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().inner.poll(cx) + Self { + state: Arc::clone(&self.state), + } } } -#[pinned_drop] -impl PinnedDrop for TrackedFuture { - fn drop(self: Pin<&mut Self>) { - // Note: This could cause a double-panic in an extreme situation where - // the internal `TrackerRegistry` lock is poisoned and drop was - // called as part of unwinding the stack to handle another panic - let this = self.project(); - this.reg.untrack(this.id) +impl TrackerRegistration { + fn new(watch: tokio::sync::watch::Receiver) -> Self { + let state = Arc::new(TrackerState { + start_instant: Instant::now(), + cpu_nanos: AtomicUsize::new(0), + wall_nanos: AtomicUsize::new(0), + cancel_token: CancellationToken::new(), + created_futures: AtomicUsize::new(0), + pending_futures: AtomicUsize::new(0), + pending_registrations: AtomicUsize::new(1), + watch, + }); + + Self { state } + } +} + +impl Drop for TrackerRegistration { + fn drop(&mut self) { + let previous = self + .state + .pending_registrations + .fetch_sub(1, Ordering::Release); + assert_ne!(previous, 0); } } #[cfg(test)] mod tests { + use std::time::Duration; + use super::*; use tokio::sync::oneshot; #[tokio::test] async fn test_lifecycle() { let (sender, receive) = oneshot::channel(); - let reg = TrackerRegistry::new(); + let registry = TrackerRegistry::new(); + let (_, registration) = registry.register(()); - let task = tokio::spawn(receive.track(®, ())); + let task = tokio::spawn(receive.track(registration)); - assert_eq!(reg.tracked().len(), 1); + assert_eq!(registry.running().len(), 1); sender.send(()).unwrap(); task.await.unwrap().unwrap().unwrap(); - assert_eq!(reg.tracked().len(), 0); + assert_eq!(registry.running().len(), 0); } #[tokio::test] async fn test_interleaved() { let (sender1, receive1) = oneshot::channel(); let (sender2, receive2) = oneshot::channel(); - let reg = TrackerRegistry::new(); + let registry = TrackerRegistry::new(); + let (_, registration1) = registry.register(1); + let (_, registration2) = registry.register(2); - let task1 = tokio::spawn(receive1.track(®, 1)); - let task2 = tokio::spawn(receive2.track(®, 2)); + let task1 = tokio::spawn(receive1.track(registration1)); + let task2 = tokio::spawn(receive2.track(registration2)); - let mut tracked: Vec<_> = reg.tracked().iter().map(|x| x.1).collect(); - tracked.sort_unstable(); - assert_eq!(tracked, vec![1, 2]); + let tracked = sorted(registry.running()); + assert_eq!(get_metadata(&tracked), vec![1, 2]); sender2.send(()).unwrap(); task2.await.unwrap().unwrap().unwrap(); - let tracked: Vec<_> = reg.tracked().iter().map(|x| x.1).collect(); - assert_eq!(tracked, vec![1]); + let tracked: Vec<_> = sorted(registry.running()); + assert_eq!(get_metadata(&tracked), vec![1]); sender1.send(42).unwrap(); let ret = task1.await.unwrap().unwrap().unwrap(); assert_eq!(ret, 42); - assert_eq!(reg.tracked().len(), 0); + assert_eq!(registry.running().len(), 0); } #[tokio::test] async fn test_drop() { - let reg = TrackerRegistry::new(); + let registry = TrackerRegistry::new(); + let (_, registration) = registry.register(()); { - let f = futures::future::pending::<()>().track(®, ()); + let f = futures::future::pending::<()>().track(registration); - assert_eq!(reg.tracked().len(), 1); + assert_eq!(registry.running().len(), 1); std::mem::drop(f); } - assert_eq!(reg.tracked().len(), 0); + assert_eq!(registry.running().len(), 0); + } + + #[tokio::test] + async fn test_drop_multiple() { + let registry = TrackerRegistry::new(); + let (_, registration) = registry.register(()); + + { + let f = futures::future::pending::<()>().track(registration.clone()); + { + let f = futures::future::pending::<()>().track(registration); + assert_eq!(registry.running().len(), 1); + std::mem::drop(f); + } + assert_eq!(registry.running().len(), 1); + std::mem::drop(f); + } + + assert_eq!(registry.running().len(), 0); } #[tokio::test] async fn test_terminate() { - let reg = TrackerRegistry::new(); + let registry = TrackerRegistry::new(); + let (_, registration) = registry.register(()); - let task = tokio::spawn(futures::future::pending::<()>().track(®, ())); + let task = tokio::spawn(futures::future::pending::<()>().track(registration)); - let tracked = reg.tracked(); + let tracked = registry.running(); assert_eq!(tracked.len(), 1); - reg.terminate(tracked[0].0); + tracked[0].cancel(); let result = task.await.unwrap(); assert!(result.is_err()); - assert_eq!(reg.tracked().len(), 0); + assert_eq!(registry.running().len(), 0); + } + + #[tokio::test] + async fn test_terminate_early() { + let registry = TrackerRegistry::new(); + let (tracker, registration) = registry.register(()); + tracker.cancel(); + + let task1 = tokio::spawn(futures::future::pending::<()>().track(registration)); + let result1 = task1.await.unwrap(); + + assert!(result1.is_err()); + assert_eq!(registry.running().len(), 0); + } + + #[tokio::test] + async fn test_terminate_multiple() { + let registry = TrackerRegistry::new(); + let (_, registration) = registry.register(()); + + let task1 = tokio::spawn(futures::future::pending::<()>().track(registration.clone())); + let task2 = tokio::spawn(futures::future::pending::<()>().track(registration)); + + let tracked = registry.running(); + assert_eq!(tracked.len(), 1); + + tracked[0].cancel(); + + let result1 = task1.await.unwrap(); + let result2 = task2.await.unwrap(); + + assert!(result1.is_err()); + assert!(result2.is_err()); + assert_eq!(registry.running().len(), 0); + } + + #[tokio::test] + async fn test_reclaim() { + let registry = TrackerRegistry::new(); + + let (_, registration1) = registry.register(1); + let (_, registration2) = registry.register(2); + let (_, registration3) = registry.register(3); + + let task1 = tokio::spawn(futures::future::pending::<()>().track(registration1.clone())); + let task2 = tokio::spawn(futures::future::pending::<()>().track(registration1)); + let task3 = tokio::spawn(futures::future::ready(()).track(registration2.clone())); + let task4 = tokio::spawn(futures::future::pending::<()>().track(registration2)); + let task5 = tokio::spawn(futures::future::pending::<()>().track(registration3)); + + let running = sorted(registry.running()); + let tracked = sorted(registry.tracked()); + + assert_eq!(running.len(), 3); + assert_eq!(get_metadata(&running), vec![1, 2, 3]); + assert_eq!(tracked.len(), 3); + assert_eq!(get_metadata(&tracked), vec![1, 2, 3]); + + // Trigger termination of task1 and task2 + running[0].cancel(); + + let result1 = task1.await.unwrap(); + let result2 = task2.await.unwrap(); + + assert!(result1.is_err()); + assert!(result2.is_err()); + + let running = sorted(registry.running()); + let tracked = sorted(registry.tracked()); + + assert_eq!(running.len(), 2); + assert_eq!(get_metadata(&running), vec![2, 3]); + assert_eq!(tracked.len(), 3); + assert_eq!(get_metadata(&tracked), vec![1, 2, 3]); + + // Expect reclaim to find now finished registration1 + let reclaimed = sorted(registry.reclaim()); + assert_eq!(reclaimed.len(), 1); + assert_eq!(get_metadata(&reclaimed), vec![1]); + + // Now expect tracked to match running + let running = sorted(registry.running()); + let tracked = sorted(registry.tracked()); + + assert_eq!(running.len(), 2); + assert_eq!(get_metadata(&running), vec![2, 3]); + assert_eq!(tracked.len(), 2); + assert_eq!(get_metadata(&tracked), vec![2, 3]); + + // Wait for task3 to finish + let result3 = task3.await.unwrap(); + assert!(result3.is_ok()); + + assert_eq!(tracked[0].pending_futures(), 1); + assert_eq!(tracked[0].created_futures(), 2); + assert!(!tracked[0].is_complete()); + + // Trigger termination of task5 + running[1].cancel(); + + let result5 = task5.await.unwrap(); + assert!(result5.is_err()); + + let running = sorted(registry.running()); + let tracked = sorted(registry.tracked()); + + assert_eq!(running.len(), 1); + assert_eq!(get_metadata(&running), vec![2]); + assert_eq!(tracked.len(), 2); + assert_eq!(get_metadata(&tracked), vec![2, 3]); + + // Trigger termination of task4 + running[0].cancel(); + + let result4 = task4.await.unwrap(); + assert!(result4.is_err()); + + assert_eq!(running[0].pending_futures(), 0); + assert_eq!(running[0].created_futures(), 2); + assert!(running[0].is_complete()); + + let reclaimed = sorted(registry.reclaim()); + + assert_eq!(reclaimed.len(), 2); + assert_eq!(get_metadata(&reclaimed), vec![2, 3]); + assert_eq!(registry.tracked().len(), 0); + } + + // Use n+1 threads where n is the number of "blocking" tasks + // to prevent stalling the tokio executor + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_timing() { + let registry = TrackerRegistry::new(); + let (tracker1, registration1) = registry.register(1); + let (tracker2, registration2) = registry.register(2); + let (tracker3, registration3) = registry.register(3); + + let task1 = + tokio::spawn(tokio::time::sleep(Duration::from_millis(100)).track(registration1)); + let task2 = tokio::spawn( + async move { std::thread::sleep(Duration::from_millis(100)) }.track(registration2), + ); + + let task3 = tokio::spawn( + async move { std::thread::sleep(Duration::from_millis(100)) } + .track(registration3.clone()), + ); + + let task4 = tokio::spawn( + async move { std::thread::sleep(Duration::from_millis(100)) }.track(registration3), + ); + + task1.await.unwrap().unwrap(); + task2.await.unwrap().unwrap(); + task3.await.unwrap().unwrap(); + task4.await.unwrap().unwrap(); + + assert_eq!(tracker1.pending_futures(), 0); + assert_eq!(tracker2.pending_futures(), 0); + assert_eq!(tracker3.pending_futures(), 0); + + assert!(tracker1.is_complete()); + assert!(tracker2.is_complete()); + assert!(tracker3.is_complete()); + + assert_eq!(tracker2.created_futures(), 1); + assert_eq!(tracker2.created_futures(), 1); + assert_eq!(tracker3.created_futures(), 2); + + let assert_fuzzy = |actual: usize, expected: std::time::Duration| { + // Number of milliseconds of toleration + let epsilon = Duration::from_millis(10).as_nanos() as usize; + let expected = expected.as_nanos() as usize; + + assert!( + actual > expected.saturating_sub(epsilon), + "Expected {} got {}", + expected, + actual + ); + assert!( + actual < expected.saturating_add(epsilon), + "Expected {} got {}", + expected, + actual + ); + }; + + assert_fuzzy(tracker1.cpu_nanos(), Duration::from_millis(0)); + assert_fuzzy(tracker1.wall_nanos(), Duration::from_millis(100)); + assert_fuzzy(tracker2.cpu_nanos(), Duration::from_millis(100)); + assert_fuzzy(tracker2.wall_nanos(), Duration::from_millis(100)); + assert_fuzzy(tracker3.cpu_nanos(), Duration::from_millis(200)); + assert_fuzzy(tracker3.wall_nanos(), Duration::from_millis(100)); + } + + #[tokio::test] + async fn test_register_race() { + let registry = TrackerRegistry::new(); + let (_, registration) = registry.register(()); + + let task1 = tokio::spawn(futures::future::ready(()).track(registration.clone())); + task1.await.unwrap().unwrap(); + + // Should only consider tasks complete once cannot register more Futures + let reclaimed = registry.reclaim(); + assert_eq!(reclaimed.len(), 0); + + let task2 = tokio::spawn(futures::future::ready(()).track(registration)); + task2.await.unwrap().unwrap(); + + let reclaimed = registry.reclaim(); + assert_eq!(reclaimed.len(), 1); + } + + fn sorted(mut input: Vec>) -> Vec> { + input.sort_unstable_by_key(|x| *x.metadata()); + input + } + + fn get_metadata(input: &[Tracker]) -> Vec { + let mut ret: Vec<_> = input.iter().map(|x| *x.metadata()).collect(); + ret.sort_unstable(); + ret } } diff --git a/server/src/tracker/future.rs b/server/src/tracker/future.rs new file mode 100644 index 0000000000..342a89adb2 --- /dev/null +++ b/server/src/tracker/future.rs @@ -0,0 +1,87 @@ +use std::pin::Pin; +use std::sync::atomic::Ordering; +use std::task::{Context, Poll}; +use std::time::Instant; + +use futures::{future::BoxFuture, prelude::*}; +use pin_project::{pin_project, pinned_drop}; + +use super::{TrackerRegistration, TrackerState}; +use std::sync::Arc; + +/// An extension trait that provides `self.track(registration)` allowing +/// associating this future with a `TrackerRegistration` +pub trait TrackedFutureExt: Future { + fn track(self, registration: TrackerRegistration) -> TrackedFuture + where + Self: Sized, + { + let tracker = Arc::clone(®istration.state); + let token = tracker.cancel_token.clone(); + + tracker.created_futures.fetch_add(1, Ordering::Relaxed); + tracker.pending_futures.fetch_add(1, Ordering::Relaxed); + + // This must occur after the increment of pending_futures + std::mem::drop(registration); + + // The future returned by CancellationToken::cancelled borrows the token + // In order to ensure we get a future with a static lifetime + // we box them up together and let async work its magic + let abort = Box::pin(async move { token.cancelled().await }); + + TrackedFuture { + inner: self, + abort, + tracker, + } + } +} + +impl TrackedFutureExt for T where T: Future {} + +/// The `Future` returned by `TrackedFutureExt::track()` +/// Unregisters the future from the registered `TrackerRegistry` on drop +/// and provides the early termination functionality used by +/// `TrackerRegistry::terminate` +#[pin_project(PinnedDrop)] +#[allow(missing_debug_implementations)] +pub struct TrackedFuture { + #[pin] + inner: F, + #[pin] + abort: BoxFuture<'static, ()>, + tracker: Arc, +} + +impl Future for TrackedFuture { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.as_mut().project().abort.poll(cx).is_ready() { + return Poll::Ready(Err(future::Aborted {})); + } + + let start = Instant::now(); + let poll = self.as_mut().project().inner.poll(cx); + let delta = start.elapsed().as_nanos() as usize; + + self.tracker.cpu_nanos.fetch_add(delta, Ordering::Relaxed); + + poll.map(Ok) + } +} + +#[pinned_drop] +impl PinnedDrop for TrackedFuture { + fn drop(self: Pin<&mut Self>) { + let state = &self.project().tracker; + + let wall_nanos = state.start_instant.elapsed().as_nanos() as usize; + + state.wall_nanos.fetch_max(wall_nanos, Ordering::Relaxed); + + let previous = state.pending_futures.fetch_sub(1, Ordering::Release); + assert_ne!(previous, 0); + } +} diff --git a/server/src/tracker/registry.rs b/server/src/tracker/registry.rs new file mode 100644 index 0000000000..88028269c6 --- /dev/null +++ b/server/src/tracker/registry.rs @@ -0,0 +1,126 @@ +use super::{Tracker, TrackerRegistration}; +use hashbrown::HashMap; +use parking_lot::Mutex; +use std::str::FromStr; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use tracing::debug; + +/// Every future registered with a `TrackerRegistry` is assigned a unique +/// `TrackerId` +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct TrackerId(usize); + +impl FromStr for TrackerId { + type Err = std::num::ParseIntError; + + fn from_str(s: &str) -> Result { + Ok(Self(FromStr::from_str(s)?)) + } +} + +impl ToString for TrackerId { + fn to_string(&self) -> String { + self.0.to_string() + } +} + +/// Internal data stored by TrackerRegistry +#[derive(Debug)] +struct TrackerSlot { + tracker: Tracker, + watch: tokio::sync::watch::Sender, +} + +/// Allows tracking the lifecycle of futures registered by +/// `TrackedFutureExt::track` with an accompanying metadata payload of type T +/// +/// Additionally can trigger graceful cancellation of registered futures +#[derive(Debug)] +pub struct TrackerRegistry { + next_id: AtomicUsize, + trackers: Mutex>>, +} + +impl Default for TrackerRegistry { + fn default() -> Self { + Self { + next_id: AtomicUsize::new(0), + trackers: Default::default(), + } + } +} + +impl TrackerRegistry { + pub fn new() -> Self { + Default::default() + } + + /// Register a new tracker in the registry + pub fn register(&self, metadata: T) -> (Tracker, TrackerRegistration) { + let id = TrackerId(self.next_id.fetch_add(1, Ordering::Relaxed)); + let (sender, receiver) = tokio::sync::watch::channel(false); + let registration = TrackerRegistration::new(receiver); + + let tracker = Tracker { + id, + metadata: Arc::new(metadata), + state: Arc::clone(®istration.state), + }; + + self.trackers.lock().insert( + id, + TrackerSlot { + tracker: tracker.clone(), + watch: sender, + }, + ); + + (tracker, registration) + } + + /// Removes completed tasks from the registry and returns a list of those + /// removed + pub fn reclaim(&self) -> Vec> { + self.trackers + .lock() + .drain_filter(|_, v| v.tracker.is_complete()) + .map(|(_, v)| { + if let Err(error) = v.watch.send(true) { + // As we hold a reference to the Tracker here, this should be impossible + debug!(?error, "failed to publish tracker completion") + } + v.tracker + }) + .collect() + } +} + +impl TrackerRegistry { + pub fn get(&self, id: TrackerId) -> Option> { + self.trackers.lock().get(&id).map(|x| x.tracker.clone()) + } + + /// Returns a list of trackers, including those that are no longer running + pub fn tracked(&self) -> Vec> { + self.trackers + .lock() + .iter() + .map(|(_, v)| v.tracker.clone()) + .collect() + } + + /// Returns a list of active trackers + pub fn running(&self) -> Vec> { + self.trackers + .lock() + .iter() + .filter_map(|(_, v)| { + if !v.tracker.is_complete() { + return Some(v.tracker.clone()); + } + None + }) + .collect() + } +} diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index 45b0c57bb9..f8f41b6a60 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -130,7 +130,6 @@ pub async fn main(logging_level: LoggingLevel, config: RunConfig) -> Result<()> info!(bind_address=?grpc_bind_addr, "gRPC server listening"); // Construct and start up HTTP server - let router_service = http::router_service(Arc::clone(&app_server)); let bind_addr = config.http_bind_address; @@ -142,8 +141,11 @@ pub async fn main(logging_level: LoggingLevel, config: RunConfig) -> Result<()> let git_hash = option_env!("GIT_HASH").unwrap_or("UNKNOWN"); info!(git_hash, "InfluxDB IOx server ready"); - // Wait for both the servers to complete - let (grpc_server, server) = futures::future::join(grpc_server, http_server).await; + // Get IOx background worker task + let app = app_server.background_worker(); + + // TODO: Fix shutdown handling (#827) + let (grpc_server, server, _) = futures::future::join3(grpc_server, http_server, app).await; grpc_server.context(ServingRPC)?; server.context(ServingHttp)?; diff --git a/src/influxdb_ioxd/rpc.rs b/src/influxdb_ioxd/rpc.rs index 7998674235..385071ffb9 100644 --- a/src/influxdb_ioxd/rpc.rs +++ b/src/influxdb_ioxd/rpc.rs @@ -11,6 +11,7 @@ use server::{ConnectionManager, Server}; pub mod error; mod flight; mod management; +mod operations; mod storage; mod testing; mod write; @@ -53,7 +54,8 @@ where .add_service(storage::make_server(Arc::clone(&server))) .add_service(flight::make_server(Arc::clone(&server))) .add_service(write::make_server(Arc::clone(&server))) - .add_service(management::make_server(server)) + .add_service(management::make_server(Arc::clone(&server))) + .add_service(operations::make_server(server)) .serve_with_incoming(stream) .await .context(ServerError {}) diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index 570d57875a..fef16299d6 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -127,6 +127,16 @@ where Ok(Response::new(ListChunksResponse { chunks })) } + async fn create_dummy_job( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let slot = self.server.spawn_dummy_job(request.nanos); + let operation = Some(super::operations::encode_tracker(slot)?); + Ok(Response::new(CreateDummyJobResponse { operation })) + } + async fn list_remotes( &self, _: Request, diff --git a/src/influxdb_ioxd/rpc/operations.rs b/src/influxdb_ioxd/rpc/operations.rs new file mode 100644 index 0000000000..9bacc595ae --- /dev/null +++ b/src/influxdb_ioxd/rpc/operations.rs @@ -0,0 +1,182 @@ +use std::fmt::Debug; +use std::sync::Arc; + +use bytes::BytesMut; +use prost::Message; +use tonic::Response; +use tracing::debug; + +use data_types::job::Job; +use generated_types::google::FieldViolationExt; +use generated_types::{ + google::{ + longrunning::*, + protobuf::{Any, Empty}, + rpc::Status, + FieldViolation, InternalError, NotFound, + }, + influxdata::iox::management::v1 as management, +}; +use server::{ + tracker::{Tracker, TrackerId}, + ConnectionManager, Server, +}; +use std::convert::TryInto; + +/// Implementation of the write service +struct OperationsService { + server: Arc>, +} + +pub fn encode_tracker(tracker: Tracker) -> Result { + let id = tracker.id(); + let is_complete = tracker.is_complete(); + let is_cancelled = tracker.is_cancelled(); + + let mut buffer = BytesMut::new(); + management::OperationMetadata { + cpu_nanos: tracker.cpu_nanos() as _, + wall_nanos: tracker.wall_nanos() as _, + task_count: tracker.created_futures() as _, + pending_count: tracker.pending_futures() as _, + job: Some(tracker.metadata().clone().into()), + } + .encode(&mut buffer) + .map_err(|error| { + debug!(?error, "Unexpected error"); + InternalError {} + })?; + + let metadata = Any { + type_url: "type.googleapis.com/influxdata.iox.management.v1.OperationMetadata".to_string(), + value: buffer.freeze(), + }; + + let result = match (is_complete, is_cancelled) { + (true, true) => Some(operation::Result::Error(Status { + code: tonic::Code::Cancelled as _, + message: "Job cancelled".to_string(), + details: vec![], + })), + + (true, false) => Some(operation::Result::Response(Any { + type_url: "type.googleapis.com/google.protobuf.Empty".to_string(), + value: Default::default(), // TODO: Verify this is correct + })), + + _ => None, + }; + + Ok(Operation { + name: id.to_string(), + metadata: Some(metadata), + done: is_complete, + result, + }) +} + +fn get_tracker(server: &Server, tracker: String) -> Result, tonic::Status> +where + M: ConnectionManager, +{ + let tracker_id = tracker.parse::().map_err(|e| FieldViolation { + field: "name".to_string(), + description: e.to_string(), + })?; + + let tracker = server.get_job(tracker_id).ok_or(NotFound { + resource_type: "job".to_string(), + resource_name: tracker, + ..Default::default() + })?; + + Ok(tracker) +} + +#[tonic::async_trait] +impl operations_server::Operations for OperationsService +where + M: ConnectionManager + Send + Sync + Debug + 'static, +{ + async fn list_operations( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> { + // TODO: Support pagination + let operations: Result, _> = self + .server + .tracked_jobs() + .into_iter() + .map(encode_tracker) + .collect(); + + Ok(Response::new(ListOperationsResponse { + operations: operations?, + next_page_token: Default::default(), + })) + } + + async fn get_operation( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let request = request.into_inner(); + let tracker = get_tracker(self.server.as_ref(), request.name)?; + + Ok(Response::new(encode_tracker(tracker)?)) + } + + async fn delete_operation( + &self, + _: tonic::Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented( + "IOx does not support operation deletion", + )) + } + + async fn cancel_operation( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let request = request.into_inner(); + + let tracker = get_tracker(self.server.as_ref(), request.name)?; + tracker.cancel(); + + Ok(Response::new(Empty {})) + } + + async fn wait_operation( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + // This should take into account the context deadline timeout + // Unfortunately these are currently stripped by tonic + // - https://github.com/hyperium/tonic/issues/75 + + let request = request.into_inner(); + + let tracker = get_tracker(self.server.as_ref(), request.name)?; + if let Some(timeout) = request.timeout { + let timeout = timeout.try_into().field("timeout")?; + + // Timeout is not an error so suppress it + let _ = tokio::time::timeout(timeout, tracker.join()).await; + } else { + tracker.join().await; + } + + Ok(Response::new(encode_tracker(tracker)?)) + } +} + +/// Instantiate the write service +pub fn make_server( + server: Arc>, +) -> operations_server::OperationsServer +where + M: ConnectionManager + Send + Sync + Debug + 'static, +{ + operations_server::OperationsServer::new(OperationsService { server }) +} From 334fb149b1ba9351799b61e7d78de7817700cba3 Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Fri, 12 Mar 2021 18:30:49 +0100 Subject: [PATCH 6/9] feat: Rename server run command to just run Closes #976 --- Dockerfile | 2 +- README.md | 4 +- docker/Dockerfile.iox | 2 +- docs/env.example | 2 +- docs/testing.md | 4 +- src/commands/logging.rs | 4 +- src/commands/run.rs | 341 +++++++++++++++++++++++++++++++++ src/commands/server.rs | 332 +------------------------------- src/influxdb_ioxd.rs | 28 +-- src/main.rs | 21 +- tests/common/server_fixture.rs | 2 - 11 files changed, 381 insertions(+), 361 deletions(-) create mode 100644 src/commands/run.rs diff --git a/Dockerfile b/Dockerfile index 84535ce2d8..46b9d2a192 100644 --- a/Dockerfile +++ b/Dockerfile @@ -37,4 +37,4 @@ EXPOSE 8080 8082 ENTRYPOINT ["/usr/bin/influxdb_iox"] -CMD ["server", "run"] +CMD ["run"] diff --git a/README.md b/README.md index 6decf9931a..1468a42a4c 100644 --- a/README.md +++ b/README.md @@ -133,7 +133,7 @@ takes its configuration as environment variables. You can see a list of the current configuration values by running `influxdb_iox --help`, as well as the specific subcommand config options such as `influxdb_iox -server --help`. +run --help`. Should you desire specifying config via a file, you can do so using a `.env` formatted file in the working directory. You can use the @@ -175,7 +175,7 @@ cargo build --release which will create the corresponding binary in `target/release`: ```shell -./target/release/influxdb_iox server +./target/release/influxdb_iox run ``` Similarly, you can do this in one step with: diff --git a/docker/Dockerfile.iox b/docker/Dockerfile.iox index f75d76ec19..ffa68a7092 100644 --- a/docker/Dockerfile.iox +++ b/docker/Dockerfile.iox @@ -21,4 +21,4 @@ EXPOSE 8080 8082 ENTRYPOINT ["influxdb_iox"] -CMD ["server", "run"] +CMD ["run"] diff --git a/docs/env.example b/docs/env.example index ed4f28a5c6..dc250a3530 100644 --- a/docs/env.example +++ b/docs/env.example @@ -7,7 +7,7 @@ # The full list of available configuration values can be found by in # the command line help (e.g. `env: INFLUXDB_IOX_DB_DIR=`): # -# ./influxdb_iox server --help +# ./influxdb_iox run --help # # # The identifier for the server. Used for writing to object storage and as diff --git a/docs/testing.md b/docs/testing.md index 30c936dfd9..9e424127ec 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -21,14 +21,14 @@ of the object stores, the relevant tests will run. ### Configuration differences when running the tests -When running `influxdb_iox server`, you can pick one object store to use. When running the tests, +When running `influxdb_iox run`, you can pick one object store to use. When running the tests, you can run them against all the possible object stores. There's still only one `INFLUXDB_IOX_BUCKET` variable, though, so that will set the bucket name for all configured object stores. Use the same bucket name when setting up the different services. Other than possibly configuring multiple object stores, configuring the tests to use the object store services is the same as configuring the server to use an object store service. See the output -of `influxdb_iox server --help` for instructions. +of `influxdb_iox run --help` for instructions. ## InfluxDB IOx Client diff --git a/src/commands/logging.rs b/src/commands/logging.rs index bc0386431f..0cd4ae0de3 100644 --- a/src/commands/logging.rs +++ b/src/commands/logging.rs @@ -2,7 +2,7 @@ use tracing_subscriber::{prelude::*, EnvFilter}; -use super::server::{LogFormat, RunConfig}; +use super::run::{Config, LogFormat}; /// Handles setting up logging levels #[derive(Debug)] @@ -81,7 +81,7 @@ impl LoggingLevel { /// Configures logging and tracing, based on the configuration /// values, for the IOx server (the whole enchalada) - pub fn setup_logging(&self, config: &RunConfig) -> Option { + pub fn setup_logging(&self, config: &Config) -> Option { // Copy anything from the config to the rust log environment self.set_rust_log_if_needed(config.rust_log.clone()); diff --git a/src/commands/run.rs b/src/commands/run.rs new file mode 100644 index 0000000000..ac33042f14 --- /dev/null +++ b/src/commands/run.rs @@ -0,0 +1,341 @@ +//! Implementation of command line option for running server + +use crate::commands::logging::LoggingLevel; +use crate::influxdb_ioxd; +use clap::arg_enum; +use std::{net::SocketAddr, net::ToSocketAddrs, path::PathBuf}; +use structopt::StructOpt; +use thiserror::Error; + +/// The default bind address for the HTTP API. +pub const DEFAULT_API_BIND_ADDR: &str = "127.0.0.1:8080"; + +/// The default bind address for the gRPC. +pub const DEFAULT_GRPC_BIND_ADDR: &str = "127.0.0.1:8082"; + +/// The AWS region to use for Amazon S3 based object storage if none is +/// specified. +pub const FALLBACK_AWS_REGION: &str = "us-east-1"; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Run: {0}")] + ServerError(#[from] influxdb_ioxd::Error), +} + +pub type Result = std::result::Result; + +#[derive(Debug, StructOpt)] +#[structopt( + name = "run", + about = "Runs in server mode", + long_about = "Run the IOx server.\n\nThe configuration options below can be \ + set either with the command line flags or with the specified environment \ + variable. If there is a file named '.env' in the current working directory, \ + it is sourced before loading the configuration. + +Configuration is loaded from the following sources (highest precedence first): + - command line arguments + - user set environment variables + - .env file contents + - pre-configured default values" +)] +pub struct Config { + /// This controls the IOx server logging level, as described in + /// https://crates.io/crates/env_logger. + /// + /// Levels for different modules can be specified as well. For example + /// `debug,hyper::proto::h1=info` specifies debug logging for all modules + /// except for the `hyper::proto::h1' module which will only display info + /// level logging. + #[structopt(long = "--log", env = "RUST_LOG")] + pub rust_log: Option, + + /// Log message format. Can be one of: + /// + /// "rust" (default) + /// "logfmt" (logfmt/Heroku style - https://brandur.org/logfmt) + #[structopt(long = "--log_format", env = "INFLUXDB_IOX_LOG_FORMAT")] + pub log_format: Option, + + /// This sets logging up with a pre-configured set of convenient log levels. + /// + /// -v means 'info' log levels + /// -vv means 'verbose' log level (with the exception of some particularly + /// low level libraries) + /// + /// This option is ignored if --log / RUST_LOG are set + #[structopt( + short = "-v", + long = "--verbose", + multiple = true, + takes_value = false, + parse(from_occurrences) + )] + pub verbose_count: u64, + + /// The identifier for the server. + /// + /// Used for writing to object storage and as an identifier that is added to + /// replicated writes, WAL segments and Chunks. Must be unique in a group of + /// connected or semi-connected IOx servers. Must be a number that can be + /// represented by a 32-bit unsigned integer. + #[structopt(long = "--writer-id", env = "INFLUXDB_IOX_ID")] + pub writer_id: Option, + + /// The address on which IOx will serve HTTP API requests. + #[structopt( + long = "--api-bind", + env = "INFLUXDB_IOX_BIND_ADDR", + default_value = DEFAULT_API_BIND_ADDR, + parse(try_from_str = parse_socket_addr), + )] + pub http_bind_address: SocketAddr, + + /// The address on which IOx will serve Storage gRPC API requests. + #[structopt( + long = "--grpc-bind", + env = "INFLUXDB_IOX_GRPC_BIND_ADDR", + default_value = DEFAULT_GRPC_BIND_ADDR, + parse(try_from_str = parse_socket_addr), + )] + pub grpc_bind_address: SocketAddr, + + /// The location InfluxDB IOx will use to store files locally. + #[structopt(long = "--data-dir", env = "INFLUXDB_IOX_DB_DIR")] + pub database_directory: Option, + + #[structopt( + long = "--object-store", + env = "INFLUXDB_IOX_OBJECT_STORE", + possible_values = &ObjectStore::variants(), + case_insensitive = true, + long_help = r#"Which object storage to use. If not specified, defaults to memory. + +Possible values (case insensitive): + +* memory (default): Effectively no object persistence. +* file: Stores objects in the local filesystem. Must also set `--data-dir`. +* s3: Amazon S3. Must also set `--bucket`, `--aws-access-key-id`, `--aws-secret-access-key`, and + possibly `--aws-default-region`. +* google: Google Cloud Storage. Must also set `--bucket` and `--google-service-account`. +* azure: Microsoft Azure blob storage. Must also set `--bucket`, `--azure-storage-account`, + and `--azure-storage-access-key`. + "#, + )] + pub object_store: Option, + + /// Name of the bucket to use for the object store. Must also set + /// `--object-store` to a cloud object storage to have any effect. + /// + /// If using Google Cloud Storage for the object store, this item as well + /// as `--google-service-account` must be set. + /// + /// If using S3 for the object store, must set this item as well + /// as `--aws-access-key-id` and `--aws-secret-access-key`. Can also set + /// `--aws-default-region` if not using the fallback region. + /// + /// If using Azure for the object store, set this item to the name of a + /// container you've created in the associated storage account, under + /// Blob Service > Containers. Must also set `--azure-storage-account` and + /// `--azure-storage-access-key`. + #[structopt(long = "--bucket", env = "INFLUXDB_IOX_BUCKET")] + pub bucket: Option, + + /// When using Amazon S3 as the object store, set this to an access key that + /// has permission to read from and write to the specified S3 bucket. + /// + /// Must also set `--object-store=s3`, `--bucket`, and + /// `--aws-secret-access-key`. Can also set `--aws-default-region` if not + /// using the fallback region. + /// + /// Prefer the environment variable over the command line flag in shared + /// environments. + #[structopt(long = "--aws-access-key-id", env = "AWS_ACCESS_KEY_ID")] + pub aws_access_key_id: Option, + + /// When using Amazon S3 as the object store, set this to the secret access + /// key that goes with the specified access key ID. + /// + /// Must also set `--object-store=s3`, `--bucket`, `--aws-access-key-id`. + /// Can also set `--aws-default-region` if not using the fallback region. + /// + /// Prefer the environment variable over the command line flag in shared + /// environments. + #[structopt(long = "--aws-secret-access-key", env = "AWS_SECRET_ACCESS_KEY")] + pub aws_secret_access_key: Option, + + /// When using Amazon S3 as the object store, set this to the region + /// that goes with the specified bucket if different from the fallback + /// value. + /// + /// Must also set `--object-store=s3`, `--bucket`, `--aws-access-key-id`, + /// and `--aws-secret-access-key`. + #[structopt( + long = "--aws-default-region", + env = "AWS_DEFAULT_REGION", + default_value = FALLBACK_AWS_REGION, + )] + pub aws_default_region: String, + + /// When using Google Cloud Storage as the object store, set this to the + /// path to the JSON file that contains the Google credentials. + /// + /// Must also set `--object-store=google` and `--bucket`. + #[structopt(long = "--google-service-account", env = "GOOGLE_SERVICE_ACCOUNT")] + pub google_service_account: Option, + + /// When using Microsoft Azure as the object store, set this to the + /// name you see when going to All Services > Storage accounts > [name]. + /// + /// Must also set `--object-store=azure`, `--bucket`, and + /// `--azure-storage-access-key`. + #[structopt(long = "--azure-storage-account", env = "AZURE_STORAGE_ACCOUNT")] + pub azure_storage_account: Option, + + /// When using Microsoft Azure as the object store, set this to one of the + /// Key values in the Storage account's Settings > Access keys. + /// + /// Must also set `--object-store=azure`, `--bucket`, and + /// `--azure-storage-account`. + /// + /// Prefer the environment variable over the command line flag in shared + /// environments. + #[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")] + pub azure_storage_access_key: Option, + + /// If set, Jaeger traces are emitted to this host + /// using the OpenTelemetry tracer. + /// + /// NOTE: The OpenTelemetry agent CAN ONLY be + /// configured using environment variables. It CAN NOT be configured + /// using the command line at this time. Some useful variables: + /// + /// * OTEL_SERVICE_NAME: emitter service name (iox by default) + /// * OTEL_EXPORTER_JAEGER_AGENT_HOST: hostname/address of the collector + /// * OTEL_EXPORTER_JAEGER_AGENT_PORT: listening port of the collector. + /// + /// The entire list of variables can be found in + /// https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#jaeger-exporter + #[structopt( + long = "--oetl_exporter_jaeger_agent", + env = "OTEL_EXPORTER_JAEGER_AGENT_HOST" + )] + pub jaeger_host: Option, +} + +pub async fn command(logging_level: LoggingLevel, config: Config) -> Result<()> { + Ok(influxdb_ioxd::main(logging_level, config).await?) +} + +fn parse_socket_addr(s: &str) -> std::io::Result { + let mut addrs = s.to_socket_addrs()?; + // when name resolution fails, to_socket_address returns a validation error + // so generally there is at least one result address, unless the resolver is + // drunk. + Ok(addrs + .next() + .expect("name resolution should return at least one address")) +} + +arg_enum! { + #[derive(Debug, Copy, Clone, PartialEq)] + pub enum ObjectStore { + Memory, + File, + S3, + Google, + Azure, + } +} + +/// How to format output logging messages +#[derive(Debug, Clone, Copy)] +pub enum LogFormat { + /// Default formatted logging + /// + /// Example: + /// ``` + /// level=warn msg="NO PERSISTENCE: using memory for object storage" target="influxdb_iox::influxdb_ioxd" + /// ``` + Rust, + + /// Use the (somwhat pretentiously named) Heroku / logfmt formatted output + /// format + /// + /// Example: + /// ``` + /// Jan 31 13:19:39.059 WARN influxdb_iox::influxdb_ioxd: NO PERSISTENCE: using memory for object storage + /// ``` + LogFmt, +} + +impl Default for LogFormat { + fn default() -> Self { + Self::Rust + } +} + +impl std::str::FromStr for LogFormat { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_ascii_lowercase().as_str() { + "rust" => Ok(Self::Rust), + "logfmt" => Ok(Self::LogFmt), + _ => Err(format!( + "Invalid log format '{}'. Valid options: rust, logfmt", + s + )), + } + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}; + + fn to_vec(v: &[&str]) -> Vec { + v.iter().map(|s| s.to_string()).collect() + } + + #[test] + fn test_socketaddr() -> Result<(), clap::Error> { + let c = Config::from_iter_safe( + to_vec(&["server", "--api-bind", "127.0.0.1:1234"]).into_iter(), + )?; + assert_eq!( + c.http_bind_address, + SocketAddr::from(([127, 0, 0, 1], 1234)) + ); + + let c = Config::from_iter_safe( + to_vec(&["server", "--api-bind", "localhost:1234"]).into_iter(), + )?; + // depending on where the test runs, localhost will either resolve to a ipv4 or + // an ipv6 addr. + match c.http_bind_address { + SocketAddr::V4(so) => { + assert_eq!(so, SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 1234)) + } + SocketAddr::V6(so) => assert_eq!( + so, + SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), 1234, 0, 0) + ), + }; + + assert_eq!( + Config::from_iter_safe( + to_vec(&["server", "--api-bind", "!@INv_a1d(ad0/resp_!"]).into_iter(), + ) + .map_err(|e| e.kind) + .expect_err("must fail"), + clap::ErrorKind::ValueValidation + ); + + Ok(()) + } +} diff --git a/src/commands/server.rs b/src/commands/server.rs index d270bf2969..3bad401af3 100644 --- a/src/commands/server.rs +++ b/src/commands/server.rs @@ -1,27 +1,12 @@ //! Implementation of command line option for manipulating and showing server //! config -use crate::commands::{logging::LoggingLevel, server_remote}; -use crate::influxdb_ioxd; -use clap::arg_enum; -use std::{net::SocketAddr, net::ToSocketAddrs, path::PathBuf}; +use crate::commands::server_remote; use structopt::StructOpt; use thiserror::Error; -/// The default bind address for the HTTP API. -pub const DEFAULT_API_BIND_ADDR: &str = "127.0.0.1:8080"; - -/// The default bind address for the gRPC. -pub const DEFAULT_GRPC_BIND_ADDR: &str = "127.0.0.1:8082"; - -/// The AWS region to use for Amazon S3 based object storage if none is -/// specified. -pub const FALLBACK_AWS_REGION: &str = "us-east-1"; - #[derive(Debug, Error)] pub enum Error { - #[error("Run: {0}")] - ServerError(#[from] influxdb_ioxd::Error), #[error("Remote: {0}")] RemoteError(#[from] server_remote::Error), } @@ -31,324 +16,11 @@ pub type Result = std::result::Result; #[derive(Debug, StructOpt)] #[structopt(name = "server", about = "IOx server commands")] pub enum Config { - Run(RunConfig), Remote(crate::commands::server_remote::Config), } -#[derive(Debug, StructOpt)] -#[structopt( - name = "run", - about = "Runs in server mode", - long_about = "Run the IOx server.\n\nThe configuration options below can be \ - set either with the command line flags or with the specified environment \ - variable. If there is a file named '.env' in the current working directory, \ - it is sourced before loading the configuration. - -Configuration is loaded from the following sources (highest precedence first): - - command line arguments - - user set environment variables - - .env file contents - - pre-configured default values" -)] -pub struct RunConfig { - /// This controls the IOx server logging level, as described in - /// https://crates.io/crates/env_logger. - /// - /// Levels for different modules can be specified as well. For example - /// `debug,hyper::proto::h1=info` specifies debug logging for all modules - /// except for the `hyper::proto::h1' module which will only display info - /// level logging. - #[structopt(long = "--log", env = "RUST_LOG")] - pub rust_log: Option, - - /// Log message format. Can be one of: - /// - /// "rust" (default) - /// "logfmt" (logfmt/Heroku style - https://brandur.org/logfmt) - #[structopt(long = "--log_format", env = "INFLUXDB_IOX_LOG_FORMAT")] - pub log_format: Option, - - /// This sets logging up with a pre-configured set of convenient log levels. - /// - /// -v means 'info' log levels - /// -vv means 'verbose' log level (with the exception of some particularly - /// low level libraries) - /// - /// This option is ignored if --log / RUST_LOG are set - #[structopt( - short = "-v", - long = "--verbose", - multiple = true, - takes_value = false, - parse(from_occurrences) - )] - pub verbose_count: u64, - - /// The identifier for the server. - /// - /// Used for writing to object storage and as an identifier that is added to - /// replicated writes, WAL segments and Chunks. Must be unique in a group of - /// connected or semi-connected IOx servers. Must be a number that can be - /// represented by a 32-bit unsigned integer. - #[structopt(long = "--writer-id", env = "INFLUXDB_IOX_ID")] - pub writer_id: Option, - - /// The address on which IOx will serve HTTP API requests. - #[structopt( - long = "--api-bind", - env = "INFLUXDB_IOX_BIND_ADDR", - default_value = DEFAULT_API_BIND_ADDR, - parse(try_from_str = parse_socket_addr), - )] - pub http_bind_address: SocketAddr, - - /// The address on which IOx will serve Storage gRPC API requests. - #[structopt( - long = "--grpc-bind", - env = "INFLUXDB_IOX_GRPC_BIND_ADDR", - default_value = DEFAULT_GRPC_BIND_ADDR, - parse(try_from_str = parse_socket_addr), - )] - pub grpc_bind_address: SocketAddr, - - /// The location InfluxDB IOx will use to store files locally. - #[structopt(long = "--data-dir", env = "INFLUXDB_IOX_DB_DIR")] - pub database_directory: Option, - - #[structopt( - long = "--object-store", - env = "INFLUXDB_IOX_OBJECT_STORE", - possible_values = &ObjectStore::variants(), - case_insensitive = true, - long_help = r#"Which object storage to use. If not specified, defaults to memory. - -Possible values (case insensitive): - -* memory (default): Effectively no object persistence. -* file: Stores objects in the local filesystem. Must also set `--data-dir`. -* s3: Amazon S3. Must also set `--bucket`, `--aws-access-key-id`, `--aws-secret-access-key`, and - possibly `--aws-default-region`. -* google: Google Cloud Storage. Must also set `--bucket` and `--google-service-account`. -* azure: Microsoft Azure blob storage. Must also set `--bucket`, `--azure-storage-account`, - and `--azure-storage-access-key`. - "#, - )] - pub object_store: Option, - - /// Name of the bucket to use for the object store. Must also set - /// `--object-store` to a cloud object storage to have any effect. - /// - /// If using Google Cloud Storage for the object store, this item as well - /// as `--google-service-account` must be set. - /// - /// If using S3 for the object store, must set this item as well - /// as `--aws-access-key-id` and `--aws-secret-access-key`. Can also set - /// `--aws-default-region` if not using the fallback region. - /// - /// If using Azure for the object store, set this item to the name of a - /// container you've created in the associated storage account, under - /// Blob Service > Containers. Must also set `--azure-storage-account` and - /// `--azure-storage-access-key`. - #[structopt(long = "--bucket", env = "INFLUXDB_IOX_BUCKET")] - pub bucket: Option, - - /// When using Amazon S3 as the object store, set this to an access key that - /// has permission to read from and write to the specified S3 bucket. - /// - /// Must also set `--object-store=s3`, `--bucket`, and - /// `--aws-secret-access-key`. Can also set `--aws-default-region` if not - /// using the fallback region. - /// - /// Prefer the environment variable over the command line flag in shared - /// environments. - #[structopt(long = "--aws-access-key-id", env = "AWS_ACCESS_KEY_ID")] - pub aws_access_key_id: Option, - - /// When using Amazon S3 as the object store, set this to the secret access - /// key that goes with the specified access key ID. - /// - /// Must also set `--object-store=s3`, `--bucket`, `--aws-access-key-id`. - /// Can also set `--aws-default-region` if not using the fallback region. - /// - /// Prefer the environment variable over the command line flag in shared - /// environments. - #[structopt(long = "--aws-secret-access-key", env = "AWS_SECRET_ACCESS_KEY")] - pub aws_secret_access_key: Option, - - /// When using Amazon S3 as the object store, set this to the region - /// that goes with the specified bucket if different from the fallback - /// value. - /// - /// Must also set `--object-store=s3`, `--bucket`, `--aws-access-key-id`, - /// and `--aws-secret-access-key`. - #[structopt( - long = "--aws-default-region", - env = "AWS_DEFAULT_REGION", - default_value = FALLBACK_AWS_REGION, - )] - pub aws_default_region: String, - - /// When using Google Cloud Storage as the object store, set this to the - /// path to the JSON file that contains the Google credentials. - /// - /// Must also set `--object-store=google` and `--bucket`. - #[structopt(long = "--google-service-account", env = "GOOGLE_SERVICE_ACCOUNT")] - pub google_service_account: Option, - - /// When using Microsoft Azure as the object store, set this to the - /// name you see when going to All Services > Storage accounts > [name]. - /// - /// Must also set `--object-store=azure`, `--bucket`, and - /// `--azure-storage-access-key`. - #[structopt(long = "--azure-storage-account", env = "AZURE_STORAGE_ACCOUNT")] - pub azure_storage_account: Option, - - /// When using Microsoft Azure as the object store, set this to one of the - /// Key values in the Storage account's Settings > Access keys. - /// - /// Must also set `--object-store=azure`, `--bucket`, and - /// `--azure-storage-account`. - /// - /// Prefer the environment variable over the command line flag in shared - /// environments. - #[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")] - pub azure_storage_access_key: Option, - - /// If set, Jaeger traces are emitted to this host - /// using the OpenTelemetry tracer. - /// - /// NOTE: The OpenTelemetry agent CAN ONLY be - /// configured using environment variables. It CAN NOT be configured - /// using the command line at this time. Some useful variables: - /// - /// * OTEL_SERVICE_NAME: emitter service name (iox by default) - /// * OTEL_EXPORTER_JAEGER_AGENT_HOST: hostname/address of the collector - /// * OTEL_EXPORTER_JAEGER_AGENT_PORT: listening port of the collector. - /// - /// The entire list of variables can be found in - /// https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#jaeger-exporter - #[structopt( - long = "--oetl_exporter_jaeger_agent", - env = "OTEL_EXPORTER_JAEGER_AGENT_HOST" - )] - pub jaeger_host: Option, -} - -pub async fn command(logging_level: LoggingLevel, url: String, config: Config) -> Result<()> { +pub async fn command(url: String, config: Config) -> Result<()> { match config { - Config::Run(config) => Ok(influxdb_ioxd::main(logging_level, config).await?), Config::Remote(config) => Ok(server_remote::command(url, config).await?), } } - -fn parse_socket_addr(s: &str) -> std::io::Result { - let mut addrs = s.to_socket_addrs()?; - // when name resolution fails, to_socket_address returns a validation error - // so generally there is at least one result address, unless the resolver is - // drunk. - Ok(addrs - .next() - .expect("name resolution should return at least one address")) -} - -arg_enum! { - #[derive(Debug, Copy, Clone, PartialEq)] - pub enum ObjectStore { - Memory, - File, - S3, - Google, - Azure, - } -} - -/// How to format output logging messages -#[derive(Debug, Clone, Copy)] -pub enum LogFormat { - /// Default formatted logging - /// - /// Example: - /// ``` - /// level=warn msg="NO PERSISTENCE: using memory for object storage" target="influxdb_iox::influxdb_ioxd" - /// ``` - Rust, - - /// Use the (somwhat pretentiously named) Heroku / logfmt formatted output - /// format - /// - /// Example: - /// ``` - /// Jan 31 13:19:39.059 WARN influxdb_iox::influxdb_ioxd: NO PERSISTENCE: using memory for object storage - /// ``` - LogFmt, -} - -impl Default for LogFormat { - fn default() -> Self { - Self::Rust - } -} - -impl std::str::FromStr for LogFormat { - type Err = String; - - fn from_str(s: &str) -> Result { - match s.to_ascii_lowercase().as_str() { - "rust" => Ok(Self::Rust), - "logfmt" => Ok(Self::LogFmt), - _ => Err(format!( - "Invalid log format '{}'. Valid options: rust, logfmt", - s - )), - } - } -} - -#[cfg(test)] -mod tests { - - use super::*; - - use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}; - - fn to_vec(v: &[&str]) -> Vec { - v.iter().map(|s| s.to_string()).collect() - } - - #[test] - fn test_socketaddr() -> Result<(), clap::Error> { - let c = RunConfig::from_iter_safe( - to_vec(&["server", "--api-bind", "127.0.0.1:1234"]).into_iter(), - )?; - assert_eq!( - c.http_bind_address, - SocketAddr::from(([127, 0, 0, 1], 1234)) - ); - - let c = RunConfig::from_iter_safe( - to_vec(&["server", "--api-bind", "localhost:1234"]).into_iter(), - )?; - // depending on where the test runs, localhost will either resolve to a ipv4 or - // an ipv6 addr. - match c.http_bind_address { - SocketAddr::V4(so) => { - assert_eq!(so, SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 1234)) - } - SocketAddr::V6(so) => assert_eq!( - so, - SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), 1234, 0, 0) - ), - }; - - assert_eq!( - RunConfig::from_iter_safe( - to_vec(&["server", "--api-bind", "!@INv_a1d(ad0/resp_!"]).into_iter(), - ) - .map_err(|e| e.kind) - .expect_err("must fail"), - clap::ErrorKind::ValueValidation - ); - - Ok(()) - } -} diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index f8f41b6a60..0ff31ec20c 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -1,6 +1,6 @@ use crate::commands::{ logging::LoggingLevel, - server::{ObjectStore as ObjStoreOpt, RunConfig}, + run::{Config, ObjectStore as ObjStoreOpt}, }; use hyper::Server; use object_store::{ @@ -73,7 +73,7 @@ pub type Result = std::result::Result; /// /// The logging_level passed in is the global setting (e.g. if -v or /// -vv was passed in before 'server') -pub async fn main(logging_level: LoggingLevel, config: RunConfig) -> Result<()> { +pub async fn main(logging_level: LoggingLevel, config: Config) -> Result<()> { // Handle the case if -v/-vv is specified both before and after the server // command let logging_level = logging_level.combine(LoggingLevel::new(config.verbose_count)); @@ -155,10 +155,10 @@ pub async fn main(logging_level: LoggingLevel, config: RunConfig) -> Result<()> Ok(()) } -impl TryFrom<&RunConfig> for ObjectStore { +impl TryFrom<&Config> for ObjectStore { type Error = Error; - fn try_from(config: &RunConfig) -> Result { + fn try_from(config: &Config) -> Result { match config.object_store { Some(ObjStoreOpt::Memory) | None => { Ok(Self::new_in_memory(object_store::memory::InMemory::new())) @@ -285,7 +285,7 @@ mod tests { #[test] fn default_object_store_is_memory() { - let config = RunConfig::from_iter_safe(&["server"]).unwrap(); + let config = Config::from_iter_safe(&["server"]).unwrap(); let object_store = ObjectStore::try_from(&config).unwrap(); @@ -297,7 +297,7 @@ mod tests { #[test] fn explicitly_set_object_store_to_memory() { - let config = RunConfig::from_iter_safe(&["server", "--object-store", "memory"]).unwrap(); + let config = Config::from_iter_safe(&["server", "--object-store", "memory"]).unwrap(); let object_store = ObjectStore::try_from(&config).unwrap(); @@ -309,7 +309,7 @@ mod tests { #[test] fn valid_s3_config() { - let config = RunConfig::from_iter_safe(&[ + let config = Config::from_iter_safe(&[ "server", "--object-store", "s3", @@ -332,7 +332,7 @@ mod tests { #[test] fn s3_config_missing_params() { - let config = RunConfig::from_iter_safe(&["server", "--object-store", "s3"]).unwrap(); + let config = Config::from_iter_safe(&["server", "--object-store", "s3"]).unwrap(); let err = ObjectStore::try_from(&config).unwrap_err().to_string(); @@ -345,7 +345,7 @@ mod tests { #[test] fn valid_google_config() { - let config = RunConfig::from_iter_safe(&[ + let config = Config::from_iter_safe(&[ "server", "--object-store", "google", @@ -366,7 +366,7 @@ mod tests { #[test] fn google_config_missing_params() { - let config = RunConfig::from_iter_safe(&["server", "--object-store", "google"]).unwrap(); + let config = Config::from_iter_safe(&["server", "--object-store", "google"]).unwrap(); let err = ObjectStore::try_from(&config).unwrap_err().to_string(); @@ -379,7 +379,7 @@ mod tests { #[test] fn valid_azure_config() { - let config = RunConfig::from_iter_safe(&[ + let config = Config::from_iter_safe(&[ "server", "--object-store", "azure", @@ -402,7 +402,7 @@ mod tests { #[test] fn azure_config_missing_params() { - let config = RunConfig::from_iter_safe(&["server", "--object-store", "azure"]).unwrap(); + let config = Config::from_iter_safe(&["server", "--object-store", "azure"]).unwrap(); let err = ObjectStore::try_from(&config).unwrap_err().to_string(); @@ -417,7 +417,7 @@ mod tests { fn valid_file_config() { let root = TempDir::new().unwrap(); - let config = RunConfig::from_iter_safe(&[ + let config = Config::from_iter_safe(&[ "server", "--object-store", "file", @@ -436,7 +436,7 @@ mod tests { #[test] fn file_config_missing_params() { - let config = RunConfig::from_iter_safe(&["server", "--object-store", "file"]).unwrap(); + let config = Config::from_iter_safe(&["server", "--object-store", "file"]).unwrap(); let err = ObjectStore::try_from(&config).unwrap_err().to_string(); diff --git a/src/main.rs b/src/main.rs index 3d7e10b201..48ade670df 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,6 +23,7 @@ mod commands { mod input; pub mod logging; pub mod meta; + pub mod run; pub mod server; pub mod server_remote; pub mod stats; @@ -46,13 +47,13 @@ Examples: influxdb_iox # Display all server settings - influxdb_iox server --help + influxdb_iox run --help # Run the InfluxDB IOx server with extra verbose logging - influxdb_iox -v + influxdb_iox run -v # Run InfluxDB IOx with full debug logging specified with RUST_LOG - RUST_LOG=debug influxdb_iox + RUST_LOG=debug influxdb_iox run # converts line protocol formatted data in temperature.lp to out.parquet influxdb_iox convert temperature.lp out.parquet @@ -110,9 +111,10 @@ enum Command { input: String, }, Database(commands::database::Config), - Stats(commands::stats::Config), // Clippy recommended boxing this variant because it's much larger than the others - Server(Box), + Run(Box), + Stats(commands::stats::Config), + Server(commands::server::Config), Writer(commands::writer::Config), } @@ -184,9 +186,16 @@ fn main() -> Result<(), std::io::Error> { } } Command::Server(config) => { + logging_level.setup_basic_logging(); + if let Err(e) = commands::server::command(host, config).await { + eprintln!("Server command failed: {}", e); + std::process::exit(ReturnCode::Failure as _) + } + } + Command::Run(config) => { // Note don't set up basic logging here, different logging rules apply in server // mode - if let Err(e) = commands::server::command(logging_level, host, *config).await { + if let Err(e) = commands::run::command(logging_level, *config).await { eprintln!("Server command failed: {}", e); std::process::exit(ReturnCode::Failure as _) } diff --git a/tests/common/server_fixture.rs b/tests/common/server_fixture.rs index 0b73a2bda2..892c92ca34 100644 --- a/tests/common/server_fixture.rs +++ b/tests/common/server_fixture.rs @@ -225,7 +225,6 @@ impl TestServer { let server_process = Command::cargo_bin("influxdb_iox") .unwrap() - .arg("server") .arg("run") // Can enable for debugging //.arg("-vv") @@ -251,7 +250,6 @@ impl TestServer { self.server_process.wait().unwrap(); self.server_process = Command::cargo_bin("influxdb_iox") .unwrap() - .arg("server") .arg("run") // Can enable for debugging //.arg("-vv") From 2530be6111cd3cd0f6eabe46a3527e9c0fab81b3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 12 Mar 2021 14:35:16 -0500 Subject: [PATCH 7/9] docs: Add links to tech talks --- docs/README.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/README.md b/docs/README.md index a046bf8a60..db26057ef6 100644 --- a/docs/README.md +++ b/docs/README.md @@ -4,6 +4,15 @@ This directory contains internal design documentation of potential interest for those who wish to understand how the code works. It is not intended to be general user facing documentation +## IOx Tech Talks + +We hold monthly Tech Talks that explain the project's technical underpinnings, which you can find at the links below: + +* December 2020: Rusty Introduction to Apache Arrow [recording](https://www.youtube.com/watch?v=dQFjKa9vKhM) +* Jan 2021: Data Lifecycle in InfluxDB IOx & How it Uses Object Storage for Persistence [recording](https://www.youtube.com/watch?v=KwdPifHC1Gc) +* February 2021: Intro to the InfluxDB IOx Read Buffer [recording](https://www.youtube.com/watch?v=KslD31VNqPU) [slides](https://www.slideshare.net/influxdata/influxdb-iox-tech-talks-intro-to-the-influxdb-iox-read-buffer-a-readoptimized-inmemory-query-execution-engine) +* March 2021: Query Engine Design and the Rust-Based DataFusion in Apache Arrow[recording](https://www.youtube.com/watch?v=K6eCAVEk4kU) [slides](https://www.slideshare.net/influxdata/influxdb-iox-tech-talks-query-engine-design-and-the-rustbased-datafusion-in-apache-arrow-244161934) + ## Table of Contents: * Rust style and Idiom guide: [style_guide.md](style_guide.md) From a5a7e218402f3c8d9a47494afaf86069f6a5aa9d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 12 Mar 2021 14:36:17 -0500 Subject: [PATCH 8/9] fix: space --- docs/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/README.md b/docs/README.md index db26057ef6..21d23e7e43 100644 --- a/docs/README.md +++ b/docs/README.md @@ -11,7 +11,7 @@ We hold monthly Tech Talks that explain the project's technical underpinnings, w * December 2020: Rusty Introduction to Apache Arrow [recording](https://www.youtube.com/watch?v=dQFjKa9vKhM) * Jan 2021: Data Lifecycle in InfluxDB IOx & How it Uses Object Storage for Persistence [recording](https://www.youtube.com/watch?v=KwdPifHC1Gc) * February 2021: Intro to the InfluxDB IOx Read Buffer [recording](https://www.youtube.com/watch?v=KslD31VNqPU) [slides](https://www.slideshare.net/influxdata/influxdb-iox-tech-talks-intro-to-the-influxdb-iox-read-buffer-a-readoptimized-inmemory-query-execution-engine) -* March 2021: Query Engine Design and the Rust-Based DataFusion in Apache Arrow[recording](https://www.youtube.com/watch?v=K6eCAVEk4kU) [slides](https://www.slideshare.net/influxdata/influxdb-iox-tech-talks-query-engine-design-and-the-rustbased-datafusion-in-apache-arrow-244161934) +* March 2021: Query Engine Design and the Rust-Based DataFusion in Apache Arrow [recording](https://www.youtube.com/watch?v=K6eCAVEk4kU) [slides](https://www.slideshare.net/influxdata/influxdb-iox-tech-talks-query-engine-design-and-the-rustbased-datafusion-in-apache-arrow-244161934) ## Table of Contents: From f542d6216e10e8f309aff46e9e4bc272f1d29355 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 12 Mar 2021 14:49:56 -0500 Subject: [PATCH 9/9] fix: Update docs/README.md Co-authored-by: Paul Dix --- docs/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/README.md b/docs/README.md index 21d23e7e43..c6608339df 100644 --- a/docs/README.md +++ b/docs/README.md @@ -6,7 +6,7 @@ not intended to be general user facing documentation ## IOx Tech Talks -We hold monthly Tech Talks that explain the project's technical underpinnings, which you can find at the links below: +We hold monthly Tech Talks that explain the project's technical underpinnings. You can register for the [InfluxDB IOx Tech Talks here](https://www.influxdata.com/community-showcase/influxdb-tech-talks/), or you can find links to previous sessions below: * December 2020: Rusty Introduction to Apache Arrow [recording](https://www.youtube.com/watch?v=dQFjKa9vKhM) * Jan 2021: Data Lifecycle in InfluxDB IOx & How it Uses Object Storage for Persistence [recording](https://www.youtube.com/watch?v=KwdPifHC1Gc)