Merge branch 'main' into cn/extract-iox-object-store

pull/24376/head
kodiakhq[bot] 2021-08-13 13:45:35 +00:00 committed by GitHub
commit d506da2a1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 888 additions and 670 deletions

127
Cargo.lock generated
View File

@ -161,8 +161,8 @@ dependencies = [
"prost 0.8.0",
"prost-derive 0.8.0",
"tokio",
"tonic 0.5.0",
"tonic-build 0.5.1",
"tonic 0.5.2",
"tonic-build 0.5.2",
]
[[package]]
@ -270,7 +270,7 @@ dependencies = [
"log",
"md5",
"oauth2",
"paste 1.0.5",
"paste",
"quick-error",
"reqwest",
"serde",
@ -371,9 +371,9 @@ dependencies = [
[[package]]
name = "bitflags"
version = "1.2.1"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
checksum = "2da1976d75adbe5fbc88130ecd119529cf1cc6a93ae1546d8696ee66f0d21af1"
[[package]]
name = "bitvec"
@ -850,13 +850,13 @@ dependencies = [
name = "datafusion"
version = "0.1.0"
dependencies = [
"datafusion 4.0.0-SNAPSHOT",
"datafusion 5.0.0",
]
[[package]]
name = "datafusion"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=a5a58c4f23720eda63b02a6cad2902b715288db6#a5a58c4f23720eda63b02a6cad2902b715288db6"
version = "5.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=fa3f0998dfe2dfedb0ab67665329e177e000fc64#fa3f0998dfe2dfedb0ab67665329e177e000fc64"
dependencies = [
"ahash 0.7.4",
"arrow",
@ -868,7 +868,7 @@ dependencies = [
"num_cpus",
"ordered-float 2.7.0",
"parquet",
"paste 1.0.5",
"paste",
"pin-project-lite",
"rand 0.8.4",
"smallvec",
@ -1303,8 +1303,8 @@ dependencies = [
"serde",
"serde_json",
"thiserror",
"tonic 0.5.0",
"tonic-build 0.5.1",
"tonic 0.5.2",
"tonic-build 0.5.2",
]
[[package]]
@ -1371,7 +1371,7 @@ dependencies = [
"futures",
"grpc-router-test-gen",
"observability_deps",
"paste 1.0.5",
"paste",
"prost 0.8.0",
"prost-build 0.8.0",
"prost-types 0.8.0",
@ -1379,8 +1379,8 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"tonic 0.5.0",
"tonic-build 0.5.1",
"tonic 0.5.2",
"tonic-build 0.5.2",
"tonic-reflection",
]
@ -1391,8 +1391,8 @@ dependencies = [
"prost 0.8.0",
"prost-build 0.8.0",
"prost-types 0.8.0",
"tonic 0.5.0",
"tonic-build 0.5.1",
"tonic 0.5.2",
"tonic-build 0.5.2",
]
[[package]]
@ -1490,9 +1490,9 @@ dependencies = [
[[package]]
name = "http-body"
version = "0.4.2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60daa14be0e0786db0f03a9e57cb404c9d756eed2b6c62b9ea98ec5743ec75a9"
checksum = "399c583b2979440c60be0821a6199eca73bc3c8dcd9d070d75ac726e2c6186e5"
dependencies = [
"bytes",
"http",
@ -1714,7 +1714,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"tonic 0.5.0",
"tonic 0.5.2",
"tonic-health",
"tonic-reflection",
"tracker",
@ -1740,7 +1740,7 @@ dependencies = [
"serde_json",
"thiserror",
"tokio",
"tonic 0.5.0",
"tonic 0.5.2",
]
[[package]]
@ -1899,9 +1899,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.98"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "320cfe77175da3a483efed4bc0adc1968ca050b098ce4f2f1c13a56626128790"
checksum = "a7f823d141fe0a24df1e23b4af4e3c7ba9e5966ec514ea068c93024aa7deb765"
[[package]]
name = "libloading"
@ -2004,9 +2004,9 @@ dependencies = [
[[package]]
name = "matches"
version = "0.1.8"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "md5"
@ -2174,9 +2174,9 @@ dependencies = [
[[package]]
name = "native-tls"
version = "0.2.7"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8d96b2e1c8da3957d58100b09f102c6d9cfdfced01b7ec5a8974044bb09dbd4"
checksum = "48ba9f7719b5a0f42f338907614285fb5fd70e53858141f69898a1fb7203b24d"
dependencies = [
"lazy_static",
"libc",
@ -2637,7 +2637,7 @@ dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall 0.2.9",
"redox_syscall 0.2.10",
"smallvec",
"winapi",
]
@ -2707,31 +2707,12 @@ dependencies = [
"uuid",
]
[[package]]
name = "paste"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45ca20c77d80be666aef2b45486da86238fabe33e38306bd3118fe4af33fa880"
dependencies = [
"paste-impl",
"proc-macro-hack",
]
[[package]]
name = "paste"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf547ad0c65e31259204bd90935776d1c693cec2f4ff7abb7a1bbbd40dfe58"
[[package]]
name = "paste-impl"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d95a7db200b97ef370c8e6de0088252f7e0dfff7d047a28528e47456c0fc98b6"
dependencies = [
"proc-macro-hack",
]
[[package]]
name = "peeking_take_while"
version = "0.1.2"
@ -2929,9 +2910,9 @@ checksum = "57e35a3326b75e49aa85f5dc6ec15b41108cf5aee58eabb1f274dd18b73c2451"
[[package]]
name = "predicates-tree"
version = "1.0.2"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15f553275e5721409451eb85e15fd9a860a6e5ab4496eb215987502b5f5391f2"
checksum = "d7dd0fd014130206c9352efbdc92be592751b2b9274dff685348341082c6ea3d"
dependencies = [
"predicates-core",
"treeline",
@ -3125,9 +3106,9 @@ dependencies = [
[[package]]
name = "protobuf"
version = "2.24.1"
version = "2.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db50e77ae196458ccd3dc58a31ea1a90b0698ab1b7928d89f644c25d72070267"
checksum = "020f86b07722c5c4291f7c723eac4676b3892d47d9a7708dc2779696407f039b"
[[package]]
name = "query"
@ -3397,9 +3378,9 @@ checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
[[package]]
name = "redox_syscall"
version = "0.2.9"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ab49abadf3f9e1c4bc499e8845e152ad87d2ad2d30371841171169e9d75feee"
checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff"
dependencies = [
"bitflags",
]
@ -3422,7 +3403,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "528532f3d801c87aec9def2add9ca802fe569e44a544afe633765267840abe64"
dependencies = [
"getrandom 0.2.3",
"redox_syscall 0.2.9",
"redox_syscall 0.2.10",
]
[[package]]
@ -4007,9 +3988,9 @@ dependencies = [
[[package]]
name = "slab"
version = "0.4.3"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f173ac3d1a7e3b28003f40de0b5ce7fe2710f9b9dc3fc38664cebee46b3b6527"
checksum = "c307a32c1c5c437f38c7fd45d753050587732ba8628319fbdf12a7e289ccc590"
[[package]]
name = "smallvec"
@ -4268,7 +4249,7 @@ dependencies = [
"cfg-if",
"libc",
"rand 0.8.4",
"redox_syscall 0.2.9",
"redox_syscall 0.2.10",
"remove_dir_all",
"winapi",
]
@ -4365,20 +4346,20 @@ dependencies = [
[[package]]
name = "tikv-jemalloc-ctl"
version = "0.4.1"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f28c80e4338857639f443169a601fafe49866aed8d7a8d565c2f5bfb1a021adf"
checksum = "eb833c46ecbf8b6daeccb347cefcabf9c1beb5c9b0f853e1cec45632d9963e69"
dependencies = [
"libc",
"paste 0.1.18",
"paste",
"tikv-jemalloc-sys",
]
[[package]]
name = "tikv-jemalloc-sys"
version = "0.4.1+5.2.1-patched"
version = "0.4.2+5.2.1-patched.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a26331b05179d4cb505c8d6814a7e18d298972f0a551b0e3cefccff927f86d3"
checksum = "5844e429d797c62945a566f8da4e24c7fe3fbd5d6617fd8bf7a0b7dc1ee0f22e"
dependencies = [
"cc",
"fs_extra",
@ -4470,9 +4451,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.9.0"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b7b349f11a7047e6d1276853e612d152f5e8a352c61917887cc2169e2366b4c"
checksum = "01cf844b23c6131f624accf65ce0e4e9956a8bb329400ea5bcc26ae3a5c20b0b"
dependencies = [
"autocfg",
"bytes",
@ -4595,9 +4576,9 @@ dependencies = [
[[package]]
name = "tonic"
version = "0.5.0"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b584f064fdfc50017ec39162d5aebce49912f1eb16fd128e04b7f4ce4907c7e5"
checksum = "796c5e1cd49905e65dd8e700d4cb1dffcbfdb4fc9d017de08c1a537afd83627c"
dependencies = [
"async-stream",
"async-trait",
@ -4638,9 +4619,9 @@ dependencies = [
[[package]]
name = "tonic-build"
version = "0.5.1"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d12faebbe071b06f486be82cc9318350814fdd07fcb28f3690840cd770599283"
checksum = "12b52d07035516c2b74337d2ac7746075e7dcae7643816c1b12c5ff8a7484c08"
dependencies = [
"proc-macro2",
"prost-build 0.8.0",
@ -4650,17 +4631,17 @@ dependencies = [
[[package]]
name = "tonic-health"
version = "0.4.0"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14e6de0a7a1b27d9899031b01b83eb09fdc36f3fe8e6254a81840006a463c6d5"
checksum = "493fcae35818dffa28437b210a615119d791116c1cac80716f571f35dd55b1b9"
dependencies = [
"async-stream",
"bytes",
"prost 0.8.0",
"tokio",
"tokio-stream",
"tonic 0.5.0",
"tonic-build 0.5.1",
"tonic 0.5.2",
"tonic-build 0.5.2",
]
[[package]]
@ -4674,8 +4655,8 @@ dependencies = [
"prost-types 0.8.0",
"tokio",
"tokio-stream",
"tonic 0.5.0",
"tonic-build 0.5.1",
"tonic 0.5.2",
"tonic-build 0.5.2",
]
[[package]]

View File

@ -1,7 +1,6 @@
//! This module contains structs that describe the metadata for a partition
//! including schema, summary statistics, and file locations in storage.
use observability_deps::tracing::warn;
use serde::{Deserialize, Serialize};
use std::{
borrow::{Borrow, Cow},
@ -107,12 +106,15 @@ impl TableSummary {
// Validate that the counts are consistent across columns
for c in &self.columns {
// Restore to assert when https://github.com/influxdata/influxdb_iox/issues/2124 is fixed
if c.total_count() != count {
warn!(table_name=%self.name, column_name=%c.name,
column_count=c.total_count(), previous_count=count,
"Mismatch in statistics count, see #2124");
}
assert_eq!(
c.total_count(),
count,
"Mismatch counts in table {} column {}, expected {} got {}",
self.name,
c.name,
count,
c.total_count()
)
}
count
}

View File

@ -9,4 +9,4 @@ description = "Re-exports datafusion at a specific version"
# Rename to workaround doctest bug
# Turn off optional datafusion features (function packages)
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="a5a58c4f23720eda63b02a6cad2902b715288db6", default-features = false, package = "datafusion" }
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="fa3f0998dfe2dfedb0ab67665329e177e000fc64", default-features = false, package = "datafusion" }

View File

@ -65,6 +65,12 @@ pub enum Error {
db_name: String,
source: Box<parquet_file::catalog::Error>,
},
#[snafu(display("failed to skip replay for database ({}): {}", db_name, source))]
SkipReplay {
db_name: String,
source: Box<InitError>,
},
}
/// A `Database` represents a single configured IOx database - i.e. an entity with a corresponding
@ -263,6 +269,50 @@ impl Database {
Ok(())
})
}
/// Recover from a ReplayError by skipping replay
pub fn skip_replay(&self) -> Result<impl Future<Output = Result<(), Error>>, Error> {
let db_name = &self.shared.config.name;
let (mut current_state, handle) = {
let state = self.shared.state.read();
let current_state = match &**state {
DatabaseState::ReplayError(rules_loaded, _) => rules_loaded.clone(),
_ => {
return Err(Error::InvalidState {
db_name: db_name.to_string(),
state: state.state_code(),
transition: "SkipReplay".to_string(),
})
}
};
let handle = state.try_freeze().ok_or(Error::TransitionInProgress {
db_name: db_name.to_string(),
state: state.state_code(),
})?;
(current_state, handle)
};
let shared = Arc::clone(&self.shared);
Ok(async move {
let db_name = &shared.config.name;
current_state.replay_plan = Arc::new(None);
let current_state = current_state
.advance()
.await
.map_err(Box::new)
.context(SkipReplay { db_name })?;
{
let mut state = shared.state.write();
*state.unfreeze(handle) = DatabaseState::Initialized(current_state);
}
Ok(())
})
}
}
impl Drop for Database {
@ -713,8 +763,13 @@ pub(super) async fn persist_database_rules(
#[cfg(test)]
mod tests {
use chrono::Utc;
use data_types::database_rules::{PartitionTemplate, TemplatePart, WriteBufferConnection};
use entry::{test_helpers::lp_to_entries, Sequence, SequencedEntry};
use write_buffer::{config::WriteBufferConfigFactory, mock::MockBufferSharedState};
use super::*;
use std::num::NonZeroU32;
use std::{convert::TryFrom, num::NonZeroU32, time::Instant};
#[tokio::test]
async fn database_shutdown_waits_for_jobs() {
@ -771,4 +826,152 @@ mod tests {
// Shouldn't have waited for server tracker to finish
assert!(!server_dummy_job.is_complete());
}
#[tokio::test]
async fn skip_replay() {
// create write buffer
let state = MockBufferSharedState::empty_with_n_sequencers(1);
let partition_template = PartitionTemplate {
parts: vec![TemplatePart::Column("partition_by".to_string())],
};
let entry_a = lp_to_entries("table_1,partition_by=a foo=1 10", &partition_template)
.pop()
.unwrap();
let entry_b = lp_to_entries("table_1,partition_by=b foo=2 20", &partition_template)
.pop()
.unwrap();
state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, 10),
Utc::now(),
entry_a,
));
state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, 11),
Utc::now(),
entry_b,
));
// setup application
let mut factory = WriteBufferConfigFactory::new();
factory.register_mock("my_mock".to_string(), state.clone());
let application = Arc::new(ApplicationState::with_write_buffer_factory(
Arc::new(ObjectStore::new_in_memory()),
Arc::new(factory),
None,
));
let server_id = ServerId::try_from(1).unwrap();
// setup DB
let db_name = DatabaseName::new("test_db").unwrap();
let rules = DatabaseRules {
name: db_name.clone(),
partition_template: partition_template.clone(),
lifecycle_rules: data_types::database_rules::LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
},
routing_rules: None,
worker_cleanup_avg_sleep: Duration::from_secs(2),
write_buffer_connection: Some(WriteBufferConnection::Reading(
"mock://my_mock".to_string(),
)),
};
let store_prefix = application.object_store().new_path();
Database::create(Arc::clone(&application), &store_prefix, rules, server_id)
.await
.unwrap();
let db_config = DatabaseConfig {
name: db_name,
server_id,
store_prefix,
wipe_catalog_on_error: false,
skip_replay: false,
};
let database = Database::new(Arc::clone(&application), db_config.clone());
database.wait_for_init().await.unwrap();
// wait for ingest
let db = database.initialized_db().unwrap();
let t_0 = Instant::now();
loop {
// use later partition here so that we can implicitely wait for both entries
if db.partition_summary("table_1", "partition_by_b").is_some() {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// partition a was forgotten, partition b is still persisted
assert!(db.partition_summary("table_1", "partition_by_a").is_some());
// persist one partition
db.persist_partition(
"table_1",
"partition_by_b",
Instant::now() + Duration::from_secs(2),
)
.await
.unwrap();
// shutdown first database
database.shutdown();
database.join().await.unwrap();
// break write buffer by removing entries
state.clear_messages(0);
let entry_c = lp_to_entries("table_1,partition_by=c foo=3 30", &partition_template)
.pop()
.unwrap();
state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, 12),
Utc::now(),
entry_c,
));
// boot actual test database
let database = Database::new(Arc::clone(&application), db_config.clone());
// db is broken
let err = database.wait_for_init().await.unwrap_err();
assert!(matches!(err.as_ref(), InitError::Replay { .. }));
// skip replay
database.skip_replay().unwrap().await.unwrap();
database.wait_for_init().await.unwrap();
// wait for ingest
let entry_d = lp_to_entries("table_1,partition_by=d foo=4 40", &partition_template)
.pop()
.unwrap();
state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, 13),
Utc::now(),
entry_d,
));
let db = database.initialized_db().unwrap();
let t_0 = Instant::now();
loop {
if db.partition_summary("table_1", "partition_by_d").is_some() {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// partition a was forgotten, partition b is still persisted, partition c was skipped
assert!(db.partition_summary("table_1", "partition_by_a").is_none());
assert!(db.partition_summary("table_1", "partition_by_b").is_some());
assert!(db.partition_summary("table_1", "partition_by_c").is_none());
// cannot skip when database is initialized
let res = database.skip_replay();
assert!(matches!(res, Err(Error::InvalidState { .. })));
// clean up
database.shutdown();
database.join().await.unwrap();
}
}

View File

@ -5,7 +5,7 @@ use structopt::StructOpt;
use thiserror::Error;
use influxdb_iox_client::{
connection::Builder,
connection::Connection,
flight,
format::QueryOutputFormat,
management::{
@ -30,9 +30,6 @@ pub enum Error {
#[error("Error listing databases: {0}")]
ListDatabaseError(#[from] ListDatabaseError),
#[error("Error connecting to IOx: {0}")]
ConnectionError(#[from] influxdb_iox_client::connection::Error),
#[error("Error reading file {:?}: {}", file_name, source)]
ReadingFile {
file_name: PathBuf,
@ -175,9 +172,7 @@ enum Command {
Catalog(catalog::Config),
}
pub async fn command(url: String, config: Config) -> Result<()> {
let connection = Builder::default().build(url.clone()).await?;
pub async fn command(connection: Connection, config: Config) -> Result<()> {
match config.command {
Command::Create(command) => {
let mut client = management::Client::new(connection);
@ -272,13 +267,13 @@ pub async fn command(url: String, config: Config) -> Result<()> {
println!("{}", formatted_result);
}
Command::Chunk(config) => {
chunk::command(url, config).await?;
chunk::command(connection, config).await?;
}
Command::Partition(config) => {
partition::command(url, config).await?;
partition::command(connection, config).await?;
}
Command::Catalog(config) => {
catalog::command(url, config).await?;
catalog::command(connection, config).await?;
}
}

View File

@ -3,7 +3,7 @@ use std::convert::TryInto;
use data_types::job::Operation;
use generated_types::google::FieldViolation;
use influxdb_iox_client::{
connection::Builder,
connection::Connection,
management::{self, WipePersistedCatalogError},
};
use snafu::{ResultExt, Snafu};
@ -12,11 +12,6 @@ use structopt::StructOpt;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error connection to IOx: {}", source))]
ConnectionError {
source: influxdb_iox_client::connection::Error,
},
#[snafu(display("Need to pass `--force`"))]
NeedsTheForceError,
@ -56,11 +51,7 @@ struct Wipe {
db_name: String,
}
pub async fn command(url: String, config: Config) -> Result<()> {
let connection = Builder::default()
.build(url)
.await
.context(ConnectionError)?;
pub async fn command(connection: Connection, config: Config) -> Result<()> {
let mut client = management::Client::new(connection);
match config.command {

View File

@ -2,7 +2,7 @@
use data_types::chunk_metadata::ChunkSummary;
use generated_types::google::FieldViolation;
use influxdb_iox_client::{
connection::Builder,
connection::Connection,
management::{self, ListChunksError},
};
use std::convert::TryFrom;
@ -47,9 +47,7 @@ enum Command {
List(List),
}
pub async fn command(url: String, config: Config) -> Result<()> {
let connection = Builder::default().build(url).await?;
pub async fn command(connection: Connection, config: Config) -> Result<()> {
match config.command {
Command::List(get) => {
let List { db_name } = get;

View File

@ -3,7 +3,7 @@ use data_types::chunk_metadata::ChunkSummary;
use data_types::job::Operation;
use generated_types::google::FieldViolation;
use influxdb_iox_client::{
connection::Builder,
connection::Connection,
management::{
self, ClosePartitionChunkError, GetPartitionError, ListPartitionChunksError,
ListPartitionsError, NewPartitionChunkError, UnloadPartitionChunkError,
@ -39,9 +39,6 @@ pub enum Error {
#[error("Received invalid response: {0}")]
InvalidResponse(#[from] FieldViolation),
#[error("Error connecting to IOx: {0}")]
ConnectionError(#[from] influxdb_iox_client::connection::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -149,8 +146,7 @@ enum Command {
UnloadChunk(UnloadChunk),
}
pub async fn command(url: String, config: Config) -> Result<()> {
let connection = Builder::default().build(url).await?;
pub async fn command(connection: Connection, config: Config) -> Result<()> {
let mut client = management::Client::new(connection);
match config.command {

View File

@ -1,7 +1,7 @@
use data_types::job::Operation;
use generated_types::google::FieldViolation;
use influxdb_iox_client::{
connection::Builder,
connection::Connection,
management,
operations::{self, Client},
};
@ -12,9 +12,6 @@ use thiserror::Error;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
#[error("Error connecting to IOx: {0}")]
ConnectionError(#[from] influxdb_iox_client::connection::Error),
#[error("Client error: {0}")]
ClientError(#[from] operations::Error),
@ -68,9 +65,7 @@ enum Command {
Test { nanos: Vec<u64> },
}
pub async fn command(url: String, config: Config) -> Result<()> {
let connection = Builder::default().build(url).await?;
pub async fn command(connection: Connection, config: Config) -> Result<()> {
match config.command {
Command::List => {
let result: Result<Vec<Operation>, _> = Client::new(connection)

View File

@ -1,9 +1,11 @@
//! Implementation of command line option for running server
use crate::influxdb_ioxd::{self, serving_readiness::ServingReadinessState};
use clap::arg_enum;
use crate::{
influxdb_ioxd::{self, serving_readiness::ServingReadinessState},
object_store::ObjectStoreConfig,
};
use data_types::server_id::ServerId;
use std::{net::SocketAddr, net::ToSocketAddrs, path::PathBuf};
use std::{net::SocketAddr, net::ToSocketAddrs};
use structopt::StructOpt;
use thiserror::Error;
use trogging::cli::{LoggingConfig, TracingConfig};
@ -14,10 +16,6 @@ pub const DEFAULT_API_BIND_ADDR: &str = "127.0.0.1:8080";
/// The default bind address for the gRPC.
pub const DEFAULT_GRPC_BIND_ADDR: &str = "127.0.0.1:8082";
/// The AWS region to use for Amazon S3 based object storage if none is
/// specified.
pub const FALLBACK_AWS_REGION: &str = "us-east-1";
#[derive(Debug, Error)]
pub enum Error {
#[error("Run: {0}")]
@ -80,6 +78,10 @@ pub struct Config {
#[structopt(flatten)]
pub(crate) tracing_config: TracingConfig,
// object store config
#[structopt(flatten)]
pub(crate) object_store_config: ObjectStoreConfig,
/// The identifier for the server.
///
/// Used for writing to object storage and as an identifier that is added to
@ -107,10 +109,6 @@ pub struct Config {
)]
pub grpc_bind_address: SocketAddr,
/// The location InfluxDB IOx will use to store files locally.
#[structopt(long = "--data-dir", env = "INFLUXDB_IOX_DB_DIR")]
pub database_directory: Option<PathBuf>,
/// The number of threads to use for all worker pools.
///
/// IOx uses a pool with `--num-threads` threads *each* for
@ -122,128 +120,6 @@ pub struct Config {
#[structopt(long = "--num-worker-threads", env = "INFLUXDB_IOX_NUM_WORKER_THREADS")]
pub num_worker_threads: Option<usize>,
#[structopt(
long = "--object-store",
env = "INFLUXDB_IOX_OBJECT_STORE",
possible_values = &ObjectStore::variants(),
case_insensitive = true,
long_help = r#"Which object storage to use. If not specified, defaults to memory.
Possible values (case insensitive):
* memory (default): Effectively no object persistence.
* memorythrottled: Like `memory` but with latency and throughput that somewhat resamble a cloud
object store. Useful for testing and benchmarking.
* file: Stores objects in the local filesystem. Must also set `--data-dir`.
* s3: Amazon S3. Must also set `--bucket`, `--aws-access-key-id`, `--aws-secret-access-key`, and
possibly `--aws-default-region`.
* google: Google Cloud Storage. Must also set `--bucket` and `--google-service-account`.
* azure: Microsoft Azure blob storage. Must also set `--bucket`, `--azure-storage-account`,
and `--azure-storage-access-key`.
"#,
)]
pub object_store: Option<ObjectStore>,
/// Name of the bucket to use for the object store. Must also set
/// `--object-store` to a cloud object storage to have any effect.
///
/// If using Google Cloud Storage for the object store, this item as well
/// as `--google-service-account` must be set.
///
/// If using S3 for the object store, must set this item as well
/// as `--aws-access-key-id` and `--aws-secret-access-key`. Can also set
/// `--aws-default-region` if not using the fallback region.
///
/// If using Azure for the object store, set this item to the name of a
/// container you've created in the associated storage account, under
/// Blob Service > Containers. Must also set `--azure-storage-account` and
/// `--azure-storage-access-key`.
#[structopt(long = "--bucket", env = "INFLUXDB_IOX_BUCKET")]
pub bucket: Option<String>,
/// When using Amazon S3 as the object store, set this to an access key that
/// has permission to read from and write to the specified S3 bucket.
///
/// Must also set `--object-store=s3`, `--bucket`, and
/// `--aws-secret-access-key`. Can also set `--aws-default-region` if not
/// using the fallback region.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--aws-access-key-id", env = "AWS_ACCESS_KEY_ID")]
pub aws_access_key_id: Option<String>,
/// When using Amazon S3 as the object store, set this to the secret access
/// key that goes with the specified access key ID.
///
/// Must also set `--object-store=s3`, `--bucket`, `--aws-access-key-id`.
/// Can also set `--aws-default-region` if not using the fallback region.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--aws-secret-access-key", env = "AWS_SECRET_ACCESS_KEY")]
pub aws_secret_access_key: Option<String>,
/// When using Amazon S3 as the object store, set this to the region
/// that goes with the specified bucket if different from the fallback
/// value.
///
/// Must also set `--object-store=s3`, `--bucket`, `--aws-access-key-id`,
/// and `--aws-secret-access-key`.
#[structopt(
long = "--aws-default-region",
env = "AWS_DEFAULT_REGION",
default_value = FALLBACK_AWS_REGION,
)]
pub aws_default_region: String,
/// When using Amazon S3 compatibility storage service, set this to the
/// endpoint.
///
/// Must also set `--object-store=s3`, `--bucket`. Can also set `--aws-default-region`
/// if not using the fallback region.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--aws-endpoint", env = "AWS_ENDPOINT")]
pub aws_endpoint: Option<String>,
/// When using Amazon S3 as an object store, set this to the session token. This is handy when using a federated
/// login / SSO and you fetch credentials via the UI.
///
/// Is it assumed that the session is valid as long as the IOx server is running.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--aws-session-token", env = "AWS_SESSION_TOKEN")]
pub aws_session_token: Option<String>,
/// When using Google Cloud Storage as the object store, set this to the
/// path to the JSON file that contains the Google credentials.
///
/// Must also set `--object-store=google` and `--bucket`.
#[structopt(long = "--google-service-account", env = "GOOGLE_SERVICE_ACCOUNT")]
pub google_service_account: Option<String>,
/// When using Microsoft Azure as the object store, set this to the
/// name you see when going to All Services > Storage accounts > `[name]`.
///
/// Must also set `--object-store=azure`, `--bucket`, and
/// `--azure-storage-access-key`.
#[structopt(long = "--azure-storage-account", env = "AZURE_STORAGE_ACCOUNT")]
pub azure_storage_account: Option<String>,
/// When using Microsoft Azure as the object store, set this to one of the
/// Key values in the Storage account's Settings > Access keys.
///
/// Must also set `--object-store=azure`, `--bucket`, and
/// `--azure-storage-account`.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")]
pub azure_storage_access_key: Option<String>,
/// When IOx nodes need to talk to remote peers they consult an internal remote address
/// mapping. This mapping is populated via API calls. If the mapping doesn't produce
/// a result, this config entry allows to generate a hostname from at template:
@ -302,18 +178,6 @@ fn parse_socket_addr(s: &str) -> std::io::Result<SocketAddr> {
.expect("name resolution should return at least one address"))
}
arg_enum! {
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum ObjectStore {
Memory,
MemoryThrottled,
File,
S3,
Google,
Azure,
}
}
#[cfg(test)]
mod tests {

View File

@ -19,9 +19,6 @@ pub enum Error {
#[error("Error updating server ID: {0}")]
UpdateServerIdError(#[from] UpdateServerIdError),
#[error("Error connecting to IOx: {0}")]
ConnectionError(#[from] influxdb_iox_client::connection::Error),
#[error("Error checking if databases are loded: {0}")]
AreDatabasesLoadedError(#[from] GetServerStatusError),
@ -67,24 +64,24 @@ struct WaitSeverInitialized {
timeout: u64,
}
use influxdb_iox_client::{connection::Builder, management::*};
pub async fn command(url: String, config: Config) -> Result<()> {
let connection = Builder::default().build(&url).await?;
let mut client = Client::new(connection);
use influxdb_iox_client::{connection::Connection, management::*};
pub async fn command(connection: Connection, config: Config) -> Result<()> {
match config.command {
Command::Set(command) => {
let mut client = Client::new(connection);
client.update_server_id(command.id).await?;
println!("Ok");
Ok(())
}
Command::Get => {
let mut client = Client::new(connection);
let id = client.get_server_id().await?;
println!("{}", id);
Ok(())
}
Command::WaitServerInitialized(command) => {
let mut client = Client::new(connection);
let end = Instant::now() + Duration::from_secs(command.timeout);
loop {
let status = client.get_server_status().await?;
@ -104,6 +101,6 @@ pub async fn command(url: String, config: Config) -> Result<()> {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
Command::Remote(config) => Ok(server_remote::command(url, config).await?),
Command::Remote(config) => Ok(server_remote::command(connection, config).await?),
}
}

View File

@ -1,4 +1,4 @@
use influxdb_iox_client::{connection::Builder, management};
use influxdb_iox_client::{connection::Connection, management};
use structopt::StructOpt;
use thiserror::Error;
@ -7,9 +7,6 @@ use prettytable::{format, Cell, Row, Table};
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
#[error("Error connecting to IOx: {0}")]
ConnectionError(#[from] influxdb_iox_client::connection::Error),
#[error("Update remote error: {0}")]
UpdateError(#[from] management::UpdateRemoteError),
@ -33,9 +30,7 @@ pub enum Config {
List,
}
pub async fn command(url: String, config: Config) -> Result<()> {
let connection = Builder::default().build(url).await?;
pub async fn command(connection: Connection, config: Config) -> Result<()> {
match config {
Config::Set {
id,

View File

@ -4,7 +4,7 @@ use observability_deps::tracing::debug;
use snafu::{ResultExt, Snafu};
use structopt::StructOpt;
use influxdb_iox_client::connection::{Builder, Connection};
use influxdb_iox_client::connection::Connection;
mod observer;
mod repl;
@ -29,12 +29,6 @@ pub struct Config {
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error connecting to {}: {}", url, source))]
Connecting {
url: String,
source: influxdb_iox_client::connection::Error,
},
#[snafu(display("Storage health check failed: {}", source))]
HealthCheck {
source: influxdb_iox_client::health::Error,
@ -47,17 +41,13 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Fire up the interactive REPL
pub async fn command(url: String, config: Config) -> Result<()> {
pub async fn command(connection: Connection, config: Config) -> Result<()> {
debug!("Starting interactive SQL prompt with {:?}", config);
let connection = Builder::default()
.build(&url)
.await
.context(Connecting { url: &url })?;
println!("Connected to IOx Server at {}", url);
check_health(connection.clone()).await?;
println!("Connected to IOx Server");
let mut repl = repl::Repl::new(connection);
repl.set_output_format(config.format).context(Repl)?;

View File

@ -1,7 +1,10 @@
use crate::commands::run::{Config, ObjectStore as ObjStoreOpt};
use futures::{future::FusedFuture, pin_mut, FutureExt, TryStreamExt};
use crate::{
commands::run::Config,
object_store::{check_object_store, warn_about_inmem_store},
};
use futures::{future::FusedFuture, pin_mut, FutureExt};
use hyper::server::conn::AddrIncoming;
use object_store::{self, path::ObjectStorePath, ObjectStore, ObjectStoreApi, ThrottleConfig};
use object_store::{self, ObjectStore};
use observability_deps::tracing::{error, info, warn};
use panic_logging::SendPanicsToTracing;
use server::{
@ -9,9 +12,7 @@ use server::{
Server as AppServer, ServerConfig,
};
use snafu::{ResultExt, Snafu};
use std::{convert::TryFrom, fs, net::SocketAddr, path::PathBuf, sync::Arc};
use tokio::time::Duration;
use uuid::Uuid;
use std::{convert::TryFrom, net::SocketAddr, sync::Arc};
mod http;
mod planner;
@ -20,12 +21,6 @@ pub(crate) mod serving_readiness;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Unable to create database directory {:?}: {}", path, source))]
CreatingDatabaseDirectory {
path: PathBuf,
source: std::io::Error,
},
#[snafu(display(
"Unable to bind to listen for HTTP requests on {}: {}",
bind_addr,
@ -52,30 +47,15 @@ pub enum Error {
#[snafu(display("Error serving RPC: {}", source))]
ServingRpc { source: rpc::Error },
#[snafu(display(
"Specified {} for the object store, required configuration missing for {}",
object_store,
missing
))]
MissingObjectStoreConfig {
object_store: ObjStoreOpt,
missing: String,
#[snafu(display("Cannot parse object store config: {}", source))]
ObjectStoreParsing {
source: crate::object_store::ParseError,
},
// Creating a new S3 object store can fail if the region is *specified* but
// not *parseable* as a rusoto `Region`. The other object store constructors
// don't return `Result`.
#[snafu(display("Error configuring Amazon S3: {}", source))]
InvalidS3Config { source: object_store::Error },
#[snafu(display("Error configuring GCS: {}", source))]
InvalidGCSConfig { source: object_store::Error },
#[snafu(display("Error configuring Microsoft Azure: {}", source))]
InvalidAzureConfig { source: object_store::Error },
#[snafu(display("Cannot read from object store: {}", source))]
CannotReadObjectStore { source: object_store::Error },
#[snafu(display("Cannot check object store config: {}", source))]
ObjectStoreCheck {
source: crate::object_store::CheckError,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -102,17 +82,12 @@ async fn wait_for_signal() {
}
async fn make_application(config: &Config) -> Result<Arc<ApplicationState>> {
match config.object_store {
Some(ObjStoreOpt::Memory) | None => {
warn!("NO PERSISTENCE: using Memory for object storage");
}
Some(store) => {
info!("Using {} for object storage", store);
}
}
let object_store = ObjectStore::try_from(config)?;
check_object_store(&object_store).await?;
warn_about_inmem_store(&config.object_store_config);
let object_store =
ObjectStore::try_from(&config.object_store_config).context(ObjectStoreParsing)?;
check_object_store(&object_store)
.await
.context(ObjectStoreCheck)?;
let object_storage = Arc::new(object_store);
Ok(Arc::new(ApplicationState::new(
@ -320,332 +295,11 @@ async fn serve(
res
}
/// Check if object store is properly configured and accepts writes and reads.
///
/// Note: This does NOT test if the object store is writable!
async fn check_object_store(object_store: &ObjectStore) -> Result<()> {
// Use some prefix that will very likely end in an empty result, so we don't pull too much actual data here.
let uuid = Uuid::new_v4().to_string();
let mut prefix = object_store.new_path();
prefix.push_dir(&uuid);
// create stream (this might fail if the store is not readable)
let mut stream = object_store
.list(Some(&prefix))
.await
.context(CannotReadObjectStore)?;
// ... but sometimes it fails only if we use the resulting stream, so try that once
stream.try_next().await.context(CannotReadObjectStore)?;
// store seems to be readable
Ok(())
}
impl TryFrom<&Config> for ObjectStore {
type Error = Error;
fn try_from(config: &Config) -> Result<Self, Self::Error> {
match config.object_store {
Some(ObjStoreOpt::Memory) | None => Ok(Self::new_in_memory()),
Some(ObjStoreOpt::MemoryThrottled) => {
let config = ThrottleConfig {
// for every call: assume a 100ms latency
wait_delete_per_call: Duration::from_millis(100),
wait_get_per_call: Duration::from_millis(100),
wait_list_per_call: Duration::from_millis(100),
wait_list_with_delimiter_per_call: Duration::from_millis(100),
wait_put_per_call: Duration::from_millis(100),
// for list operations: assume we need 1 call per 1k entries at 100ms
wait_list_per_entry: Duration::from_millis(100) / 1_000,
wait_list_with_delimiter_per_entry: Duration::from_millis(100) / 1_000,
// for upload/download: assume 1GByte/s
wait_get_per_byte: Duration::from_secs(1) / 1_000_000_000,
wait_put_per_byte: Duration::from_secs(1) / 1_000_000_000,
};
Ok(Self::new_in_memory_throttled(config))
}
Some(ObjStoreOpt::Google) => {
match (
config.bucket.as_ref(),
config.google_service_account.as_ref(),
) {
(Some(bucket), Some(service_account)) => {
Self::new_google_cloud_storage(service_account, bucket)
.context(InvalidGCSConfig)
}
(bucket, service_account) => {
let mut missing_args = vec![];
if bucket.is_none() {
missing_args.push("bucket");
}
if service_account.is_none() {
missing_args.push("google-service-account");
}
MissingObjectStoreConfig {
object_store: ObjStoreOpt::Google,
missing: missing_args.join(", "),
}
.fail()
}
}
}
Some(ObjStoreOpt::S3) => {
match (
config.bucket.as_ref(),
config.aws_access_key_id.as_ref(),
config.aws_secret_access_key.as_ref(),
config.aws_default_region.as_str(),
config.aws_endpoint.as_ref(),
config.aws_session_token.as_ref(),
) {
(Some(bucket), key_id, secret_key, region, endpoint, session_token) => {
Self::new_amazon_s3(
key_id,
secret_key,
region,
bucket,
endpoint,
session_token,
)
.context(InvalidS3Config)
}
(bucket, _, _, _, _, _) => {
let mut missing_args = vec![];
if bucket.is_none() {
missing_args.push("bucket");
}
MissingObjectStoreConfig {
object_store: ObjStoreOpt::S3,
missing: missing_args.join(", "),
}
.fail()
}
}
}
Some(ObjStoreOpt::Azure) => {
match (
config.bucket.as_ref(),
config.azure_storage_account.as_ref(),
config.azure_storage_access_key.as_ref(),
) {
(Some(bucket), Some(storage_account), Some(access_key)) => {
Self::new_microsoft_azure(storage_account, access_key, bucket)
.context(InvalidAzureConfig)
}
(bucket, storage_account, access_key) => {
let mut missing_args = vec![];
if bucket.is_none() {
missing_args.push("bucket");
}
if storage_account.is_none() {
missing_args.push("azure-storage-account");
}
if access_key.is_none() {
missing_args.push("azure-storage-access-key");
}
MissingObjectStoreConfig {
object_store: ObjStoreOpt::Azure,
missing: missing_args.join(", "),
}
.fail()
}
}
}
Some(ObjStoreOpt::File) => match config.database_directory.as_ref() {
Some(db_dir) => {
fs::create_dir_all(db_dir)
.context(CreatingDatabaseDirectory { path: db_dir })?;
Ok(Self::new_file(&db_dir))
}
None => MissingObjectStoreConfig {
object_store: ObjStoreOpt::File,
missing: "data-dir",
}
.fail(),
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use data_types::{database_rules::DatabaseRules, DatabaseName};
use object_store::ObjectStoreIntegration;
use structopt::StructOpt;
use tempfile::TempDir;
#[test]
fn default_object_store_is_memory() {
let config = Config::from_iter_safe(&["server"]).unwrap();
let object_store = ObjectStore::try_from(&config).unwrap();
let ObjectStore { integration, .. } = object_store;
assert!(matches!(integration, ObjectStoreIntegration::InMemory(_)));
}
#[test]
fn explicitly_set_object_store_to_memory() {
let config = Config::from_iter_safe(&["server", "--object-store", "memory"]).unwrap();
let object_store = ObjectStore::try_from(&config).unwrap();
let ObjectStore { integration, .. } = object_store;
assert!(matches!(integration, ObjectStoreIntegration::InMemory(_)));
}
#[test]
#[cfg(feature = "aws")]
fn valid_s3_config() {
let config = Config::from_iter_safe(&[
"server",
"--object-store",
"s3",
"--bucket",
"mybucket",
"--aws-access-key-id",
"NotARealAWSAccessKey",
"--aws-secret-access-key",
"NotARealAWSSecretAccessKey",
])
.unwrap();
let object_store = ObjectStore::try_from(&config).unwrap();
let ObjectStore { integration, .. } = object_store;
assert!(matches!(integration, ObjectStoreIntegration::AmazonS3(_)));
}
#[test]
fn s3_config_missing_params() {
let config = Config::from_iter_safe(&["server", "--object-store", "s3"]).unwrap();
let err = ObjectStore::try_from(&config).unwrap_err().to_string();
assert_eq!(
err,
"Specified S3 for the object store, required configuration missing for bucket"
);
}
#[test]
#[cfg(feature = "gcp")]
fn valid_google_config() {
let config = Config::from_iter_safe(&[
"server",
"--object-store",
"google",
"--bucket",
"mybucket",
"--google-service-account",
"~/Not/A/Real/path.json",
])
.unwrap();
let object_store = ObjectStore::try_from(&config).unwrap();
let ObjectStore { integration, .. } = object_store;
assert!(matches!(
integration,
ObjectStoreIntegration::GoogleCloudStorage(_)
));
}
#[test]
fn google_config_missing_params() {
let config = Config::from_iter_safe(&["server", "--object-store", "google"]).unwrap();
let err = ObjectStore::try_from(&config).unwrap_err().to_string();
assert_eq!(
err,
"Specified Google for the object store, required configuration missing for \
bucket, google-service-account"
);
}
#[test]
#[cfg(feature = "azure")]
fn valid_azure_config() {
let config = Config::from_iter_safe(&[
"server",
"--object-store",
"azure",
"--bucket",
"mybucket",
"--azure-storage-account",
"NotARealStorageAccount",
"--azure-storage-access-key",
"NotARealKey",
])
.unwrap();
let object_store = ObjectStore::try_from(&config).unwrap();
let ObjectStore { integration, .. } = object_store;
assert!(matches!(
integration,
ObjectStoreIntegration::MicrosoftAzure(_)
));
}
#[test]
fn azure_config_missing_params() {
let config = Config::from_iter_safe(&["server", "--object-store", "azure"]).unwrap();
let err = ObjectStore::try_from(&config).unwrap_err().to_string();
assert_eq!(
err,
"Specified Azure for the object store, required configuration missing for \
bucket, azure-storage-account, azure-storage-access-key"
);
}
#[test]
fn valid_file_config() {
let root = TempDir::new().unwrap();
let config = Config::from_iter_safe(&[
"server",
"--object-store",
"file",
"--data-dir",
root.path().to_str().unwrap(),
])
.unwrap();
let object_store = ObjectStore::try_from(&config).unwrap();
let ObjectStore { integration, .. } = object_store;
assert!(matches!(integration, ObjectStoreIntegration::File(_)));
}
#[test]
fn file_config_missing_params() {
let config = Config::from_iter_safe(&["server", "--object-store", "file"]).unwrap();
let err = ObjectStore::try_from(&config).unwrap_err().to_string();
assert_eq!(
err,
"Specified File for the object store, required configuration missing for \
data-dir"
);
}
#[tokio::test]
async fn test_server_shutdown() {

View File

@ -367,7 +367,7 @@ where
.get("/api/v1/partitions", list_partitions::<M>)
.get("/debug/pprof", pprof_home::<M>)
.get("/debug/pprof/profile", pprof_profile::<M>)
.get("/debug/pprof/heappy/profile", pprof_heappy_profile::<M>)
.get("/debug/pprof/allocs", pprof_heappy_profile::<M>)
// Specify the error handler to handle any errors caused by
// a route or any middleware.
.err_handler_with_info(error_handler)
@ -764,13 +764,17 @@ async fn pprof_home<M: ConnectionManager + Send + Sync + Debug + 'static>(
.unwrap_or(&default_host)
.to_str()
.unwrap_or_default();
let cmd = format!(
let profile_cmd = format!(
"/debug/pprof/profile?seconds={}",
PProfArgs::default_seconds()
);
let allocs_cmd = format!(
"/debug/pprof/allocs?seconds={}",
PProfArgs::default_seconds()
);
Ok(Response::new(Body::from(format!(
r#"<a href="{}">http://{}{}</a>"#,
cmd, host, cmd
r#"<a href="{}">http://{}{}</a><br><a href="{}">http://{}{}</a>"#,
profile_cmd, host, profile_cmd, allocs_cmd, host, allocs_cmd,
))))
}
@ -874,7 +878,7 @@ async fn pprof_heappy_profile<M: ConnectionManager + Send + Sync + Debug + 'stat
let mut body: Vec<u8> = Vec::new();
// render flamegraph when opening in the browser
// otherwise render as protobuf; works great with: go tool pprof http://..../debug/pprof/heappy/profile
// otherwise render as protobuf; works great with: go tool pprof http://..../debug/pprof/allocs
if req
.headers()
.get_all("Accept")

View File

@ -17,6 +17,7 @@ use commands::tracing::{init_logs_and_tracing, init_simple_logs};
use observability_deps::tracing::warn;
use crate::commands::tracing::TracingGuard;
use influxdb_iox_client::connection::Builder;
use tikv_jemallocator::Jemalloc;
mod commands {
@ -29,6 +30,8 @@ mod commands {
pub mod tracing;
}
mod object_store;
pub mod influxdb_ioxd;
#[global_allocator]
@ -149,6 +152,16 @@ fn main() -> Result<(), std::io::Error> {
let host = config.host;
let log_verbose_count = config.log_verbose_count;
let connection = || async move {
match Builder::default().build(&host).await {
Ok(connection) => connection,
Err(e) => {
eprintln!("Error connecting to {}: {}", host, e);
std::process::exit(ReturnCode::Failure as _)
}
}
};
fn handle_init_logs(r: Result<TracingGuard, trogging::Error>) -> TracingGuard {
match r {
Ok(guard) => guard,
@ -162,21 +175,24 @@ fn main() -> Result<(), std::io::Error> {
match config.command {
Command::Database(config) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
if let Err(e) = commands::database::command(host, config).await {
let connection = connection().await;
if let Err(e) = commands::database::command(connection, config).await {
eprintln!("{}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Operation(config) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
if let Err(e) = commands::operations::command(host, config).await {
let connection = connection().await;
if let Err(e) = commands::operations::command(connection, config).await {
eprintln!("{}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Server(config) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
if let Err(e) = commands::server::command(host, config).await {
let connection = connection().await;
if let Err(e) = commands::server::command(connection, config).await {
eprintln!("Server command failed: {}", e);
std::process::exit(ReturnCode::Failure as _)
}
@ -191,7 +207,8 @@ fn main() -> Result<(), std::io::Error> {
}
Command::Sql(config) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
if let Err(e) = commands::sql::command(host, config).await {
let connection = connection().await;
if let Err(e) = commands::sql::command(connection, config).await {
eprintln!("{}", e);
std::process::exit(ReturnCode::Failure as _)
}

536
src/object_store.rs Normal file
View File

@ -0,0 +1,536 @@
//! CLI handling for object store config (via CLI arguments and environment variables).
use std::{convert::TryFrom, fs, path::PathBuf, time::Duration};
use clap::arg_enum;
use futures::TryStreamExt;
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi, ThrottleConfig};
use observability_deps::tracing::{info, warn};
use snafu::{ResultExt, Snafu};
use structopt::StructOpt;
use uuid::Uuid;
#[derive(Debug, Snafu)]
pub enum ParseError {
#[snafu(display("Unable to create database directory {:?}: {}", path, source))]
CreatingDatabaseDirectory {
path: PathBuf,
source: std::io::Error,
},
#[snafu(display(
"Specified {} for the object store, required configuration missing for {}",
object_store,
missing
))]
MissingObjectStoreConfig {
object_store: ObjectStoreType,
missing: String,
},
// Creating a new S3 object store can fail if the region is *specified* but
// not *parseable* as a rusoto `Region`. The other object store constructors
// don't return `Result`.
#[snafu(display("Error configuring Amazon S3: {}", source))]
InvalidS3Config { source: object_store::Error },
#[snafu(display("Error configuring GCS: {}", source))]
InvalidGCSConfig { source: object_store::Error },
#[snafu(display("Error configuring Microsoft Azure: {}", source))]
InvalidAzureConfig { source: object_store::Error },
}
/// The AWS region to use for Amazon S3 based object storage if none is
/// specified.
pub const FALLBACK_AWS_REGION: &str = "us-east-1";
/// CLI config for object stores.
#[derive(Debug, StructOpt, Clone)]
pub struct ObjectStoreConfig {
#[structopt(
long = "--object-store",
env = "INFLUXDB_IOX_OBJECT_STORE",
possible_values = &ObjectStoreType::variants(),
case_insensitive = true,
long_help = r#"Which object storage to use. If not specified, defaults to memory.
Possible values (case insensitive):
* memory (default): Effectively no object persistence.
* memorythrottled: Like `memory` but with latency and throughput that somewhat resamble a cloud
object store. Useful for testing and benchmarking.
* file: Stores objects in the local filesystem. Must also set `--data-dir`.
* s3: Amazon S3. Must also set `--bucket`, `--aws-access-key-id`, `--aws-secret-access-key`, and
possibly `--aws-default-region`.
* google: Google Cloud Storage. Must also set `--bucket` and `--google-service-account`.
* azure: Microsoft Azure blob storage. Must also set `--bucket`, `--azure-storage-account`,
and `--azure-storage-access-key`.
"#,
)]
pub object_store: Option<ObjectStoreType>,
/// Name of the bucket to use for the object store. Must also set
/// `--object-store` to a cloud object storage to have any effect.
///
/// If using Google Cloud Storage for the object store, this item as well
/// as `--google-service-account` must be set.
///
/// If using S3 for the object store, must set this item as well
/// as `--aws-access-key-id` and `--aws-secret-access-key`. Can also set
/// `--aws-default-region` if not using the fallback region.
///
/// If using Azure for the object store, set this item to the name of a
/// container you've created in the associated storage account, under
/// Blob Service > Containers. Must also set `--azure-storage-account` and
/// `--azure-storage-access-key`.
#[structopt(long = "--bucket", env = "INFLUXDB_IOX_BUCKET")]
pub bucket: Option<String>,
/// The location InfluxDB IOx will use to store files locally.
#[structopt(long = "--data-dir", env = "INFLUXDB_IOX_DB_DIR")]
pub database_directory: Option<PathBuf>,
/// When using Amazon S3 as the object store, set this to an access key that
/// has permission to read from and write to the specified S3 bucket.
///
/// Must also set `--object-store=s3`, `--bucket`, and
/// `--aws-secret-access-key`. Can also set `--aws-default-region` if not
/// using the fallback region.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--aws-access-key-id", env = "AWS_ACCESS_KEY_ID")]
pub aws_access_key_id: Option<String>,
/// When using Amazon S3 as the object store, set this to the secret access
/// key that goes with the specified access key ID.
///
/// Must also set `--object-store=s3`, `--bucket`, `--aws-access-key-id`.
/// Can also set `--aws-default-region` if not using the fallback region.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--aws-secret-access-key", env = "AWS_SECRET_ACCESS_KEY")]
pub aws_secret_access_key: Option<String>,
/// When using Amazon S3 as the object store, set this to the region
/// that goes with the specified bucket if different from the fallback
/// value.
///
/// Must also set `--object-store=s3`, `--bucket`, `--aws-access-key-id`,
/// and `--aws-secret-access-key`.
#[structopt(
long = "--aws-default-region",
env = "AWS_DEFAULT_REGION",
default_value = FALLBACK_AWS_REGION,
)]
pub aws_default_region: String,
/// When using Amazon S3 compatibility storage service, set this to the
/// endpoint.
///
/// Must also set `--object-store=s3`, `--bucket`. Can also set `--aws-default-region`
/// if not using the fallback region.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--aws-endpoint", env = "AWS_ENDPOINT")]
pub aws_endpoint: Option<String>,
/// When using Amazon S3 as an object store, set this to the session token. This is handy when using a federated
/// login / SSO and you fetch credentials via the UI.
///
/// Is it assumed that the session is valid as long as the IOx server is running.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--aws-session-token", env = "AWS_SESSION_TOKEN")]
pub aws_session_token: Option<String>,
/// When using Google Cloud Storage as the object store, set this to the
/// path to the JSON file that contains the Google credentials.
///
/// Must also set `--object-store=google` and `--bucket`.
#[structopt(long = "--google-service-account", env = "GOOGLE_SERVICE_ACCOUNT")]
pub google_service_account: Option<String>,
/// When using Microsoft Azure as the object store, set this to the
/// name you see when going to All Services > Storage accounts > `[name]`.
///
/// Must also set `--object-store=azure`, `--bucket`, and
/// `--azure-storage-access-key`.
#[structopt(long = "--azure-storage-account", env = "AZURE_STORAGE_ACCOUNT")]
pub azure_storage_account: Option<String>,
/// When using Microsoft Azure as the object store, set this to one of the
/// Key values in the Storage account's Settings > Access keys.
///
/// Must also set `--object-store=azure`, `--bucket`, and
/// `--azure-storage-account`.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")]
pub azure_storage_access_key: Option<String>,
}
arg_enum! {
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum ObjectStoreType {
Memory,
MemoryThrottled,
File,
S3,
Google,
Azure,
}
}
pub fn warn_about_inmem_store(config: &ObjectStoreConfig) {
match config.object_store {
Some(ObjectStoreType::Memory) | None => {
warn!("NO PERSISTENCE: using Memory for object storage");
}
Some(store) => {
info!("Using {} for object storage", store);
}
}
}
impl TryFrom<&ObjectStoreConfig> for ObjectStore {
type Error = ParseError;
fn try_from(config: &ObjectStoreConfig) -> Result<Self, Self::Error> {
match config.object_store {
Some(ObjectStoreType::Memory) | None => Ok(Self::new_in_memory()),
Some(ObjectStoreType::MemoryThrottled) => {
let config = ThrottleConfig {
// for every call: assume a 100ms latency
wait_delete_per_call: Duration::from_millis(100),
wait_get_per_call: Duration::from_millis(100),
wait_list_per_call: Duration::from_millis(100),
wait_list_with_delimiter_per_call: Duration::from_millis(100),
wait_put_per_call: Duration::from_millis(100),
// for list operations: assume we need 1 call per 1k entries at 100ms
wait_list_per_entry: Duration::from_millis(100) / 1_000,
wait_list_with_delimiter_per_entry: Duration::from_millis(100) / 1_000,
// for upload/download: assume 1GByte/s
wait_get_per_byte: Duration::from_secs(1) / 1_000_000_000,
wait_put_per_byte: Duration::from_secs(1) / 1_000_000_000,
};
Ok(Self::new_in_memory_throttled(config))
}
Some(ObjectStoreType::Google) => {
match (
config.bucket.as_ref(),
config.google_service_account.as_ref(),
) {
(Some(bucket), Some(service_account)) => {
Self::new_google_cloud_storage(service_account, bucket)
.context(InvalidGCSConfig)
}
(bucket, service_account) => {
let mut missing_args = vec![];
if bucket.is_none() {
missing_args.push("bucket");
}
if service_account.is_none() {
missing_args.push("google-service-account");
}
MissingObjectStoreConfig {
object_store: ObjectStoreType::Google,
missing: missing_args.join(", "),
}
.fail()
}
}
}
Some(ObjectStoreType::S3) => {
match (
config.bucket.as_ref(),
config.aws_access_key_id.as_ref(),
config.aws_secret_access_key.as_ref(),
config.aws_default_region.as_str(),
config.aws_endpoint.as_ref(),
config.aws_session_token.as_ref(),
) {
(Some(bucket), key_id, secret_key, region, endpoint, session_token) => {
Self::new_amazon_s3(
key_id,
secret_key,
region,
bucket,
endpoint,
session_token,
)
.context(InvalidS3Config)
}
(bucket, _, _, _, _, _) => {
let mut missing_args = vec![];
if bucket.is_none() {
missing_args.push("bucket");
}
MissingObjectStoreConfig {
object_store: ObjectStoreType::S3,
missing: missing_args.join(", "),
}
.fail()
}
}
}
Some(ObjectStoreType::Azure) => {
match (
config.bucket.as_ref(),
config.azure_storage_account.as_ref(),
config.azure_storage_access_key.as_ref(),
) {
(Some(bucket), Some(storage_account), Some(access_key)) => {
Self::new_microsoft_azure(storage_account, access_key, bucket)
.context(InvalidAzureConfig)
}
(bucket, storage_account, access_key) => {
let mut missing_args = vec![];
if bucket.is_none() {
missing_args.push("bucket");
}
if storage_account.is_none() {
missing_args.push("azure-storage-account");
}
if access_key.is_none() {
missing_args.push("azure-storage-access-key");
}
MissingObjectStoreConfig {
object_store: ObjectStoreType::Azure,
missing: missing_args.join(", "),
}
.fail()
}
}
}
Some(ObjectStoreType::File) => match config.database_directory.as_ref() {
Some(db_dir) => {
fs::create_dir_all(db_dir)
.context(CreatingDatabaseDirectory { path: db_dir })?;
Ok(Self::new_file(&db_dir))
}
None => MissingObjectStoreConfig {
object_store: ObjectStoreType::File,
missing: "data-dir",
}
.fail(),
},
}
}
}
#[derive(Debug, Snafu)]
pub enum CheckError {
#[snafu(display("Cannot read from object store: {}", source))]
CannotReadObjectStore { source: object_store::Error },
}
/// Check if object store is properly configured and accepts writes and reads.
///
/// Note: This does NOT test if the object store is writable!
pub async fn check_object_store(object_store: &ObjectStore) -> Result<(), CheckError> {
// Use some prefix that will very likely end in an empty result, so we don't pull too much actual data here.
let uuid = Uuid::new_v4().to_string();
let mut prefix = object_store.new_path();
prefix.push_dir(&uuid);
// create stream (this might fail if the store is not readable)
let mut stream = object_store
.list(Some(&prefix))
.await
.context(CannotReadObjectStore)?;
// ... but sometimes it fails only if we use the resulting stream, so try that once
stream.try_next().await.context(CannotReadObjectStore)?;
// store seems to be readable
Ok(())
}
#[cfg(test)]
mod tests {
use object_store::ObjectStoreIntegration;
use tempfile::TempDir;
use super::*;
#[test]
fn default_object_store_is_memory() {
let config = ObjectStoreConfig::from_iter_safe(&["server"]).unwrap();
let object_store = ObjectStore::try_from(&config).unwrap();
let ObjectStore { integration, .. } = object_store;
assert!(matches!(integration, ObjectStoreIntegration::InMemory(_)));
}
#[test]
fn explicitly_set_object_store_to_memory() {
let config =
ObjectStoreConfig::from_iter_safe(&["server", "--object-store", "memory"]).unwrap();
let object_store = ObjectStore::try_from(&config).unwrap();
let ObjectStore { integration, .. } = object_store;
assert!(matches!(integration, ObjectStoreIntegration::InMemory(_)));
}
#[test]
#[cfg(feature = "aws")]
fn valid_s3_config() {
let config = ObjectStoreConfig::from_iter_safe(&[
"server",
"--object-store",
"s3",
"--bucket",
"mybucket",
"--aws-access-key-id",
"NotARealAWSAccessKey",
"--aws-secret-access-key",
"NotARealAWSSecretAccessKey",
])
.unwrap();
let object_store = ObjectStore::try_from(&config).unwrap();
let ObjectStore { integration, .. } = object_store;
assert!(matches!(integration, ObjectStoreIntegration::AmazonS3(_)));
}
#[test]
fn s3_config_missing_params() {
let config =
ObjectStoreConfig::from_iter_safe(&["server", "--object-store", "s3"]).unwrap();
let err = ObjectStore::try_from(&config).unwrap_err().to_string();
assert_eq!(
err,
"Specified S3 for the object store, required configuration missing for bucket"
);
}
#[test]
#[cfg(feature = "gcp")]
fn valid_google_config() {
let config = ObjectStoreConfig::from_iter_safe(&[
"server",
"--object-store",
"google",
"--bucket",
"mybucket",
"--google-service-account",
"~/Not/A/Real/path.json",
])
.unwrap();
let object_store = ObjectStore::try_from(&config).unwrap();
let ObjectStore { integration, .. } = object_store;
assert!(matches!(
integration,
ObjectStoreIntegration::GoogleCloudStorage(_)
));
}
#[test]
fn google_config_missing_params() {
let config =
ObjectStoreConfig::from_iter_safe(&["server", "--object-store", "google"]).unwrap();
let err = ObjectStore::try_from(&config).unwrap_err().to_string();
assert_eq!(
err,
"Specified Google for the object store, required configuration missing for \
bucket, google-service-account"
);
}
#[test]
#[cfg(feature = "azure")]
fn valid_azure_config() {
let config = ObjectStoreConfig::from_iter_safe(&[
"server",
"--object-store",
"azure",
"--bucket",
"mybucket",
"--azure-storage-account",
"NotARealStorageAccount",
"--azure-storage-access-key",
"NotARealKey",
])
.unwrap();
let object_store = ObjectStore::try_from(&config).unwrap();
let ObjectStore { integration, .. } = object_store;
assert!(matches!(
integration,
ObjectStoreIntegration::MicrosoftAzure(_)
));
}
#[test]
fn azure_config_missing_params() {
let config =
ObjectStoreConfig::from_iter_safe(&["server", "--object-store", "azure"]).unwrap();
let err = ObjectStore::try_from(&config).unwrap_err().to_string();
assert_eq!(
err,
"Specified Azure for the object store, required configuration missing for \
bucket, azure-storage-account, azure-storage-access-key"
);
}
#[test]
fn valid_file_config() {
let root = TempDir::new().unwrap();
let config = ObjectStoreConfig::from_iter_safe(&[
"server",
"--object-store",
"file",
"--data-dir",
root.path().to_str().unwrap(),
])
.unwrap();
let object_store = ObjectStore::try_from(&config).unwrap();
let ObjectStore { integration, .. } = object_store;
assert!(matches!(integration, ObjectStoreIntegration::File(_)));
}
#[test]
fn file_config_missing_params() {
let config =
ObjectStoreConfig::from_iter_safe(&["server", "--object-store", "file"]).unwrap();
let err = ObjectStore::try_from(&config).unwrap_err().to_string();
assert_eq!(
err,
"Specified File for the object store, required configuration missing for \
data-dir"
);
}
}