fix: Remove OG gRPC client code and APIs

pull/24376/head
Carol (Nichols || Goulding) 2022-04-29 11:19:02 -04:00
parent 8245ff83ac
commit a4443e4c31
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
18 changed files with 8 additions and 2591 deletions

View File

@ -1,12 +1,9 @@
//! This module implements the `database` CLI command
use crate::TABLE_STYLE_SINGLE_LINE_BORDERS;
use comfy_table::{Cell, Table};
use influxdb_iox_client::{
connection::Connection,
flight::{self, generated_types::ReadInfo},
format::QueryOutputFormat,
management::{self, generated_types::database_status::DatabaseState, generated_types::*},
write,
};
use iox_time::TimeProvider;
@ -14,10 +11,6 @@ use std::{fs::File, io::Read, num::NonZeroU64, path::PathBuf, str::FromStr, time
use thiserror::Error;
use uuid::Uuid;
mod chunk;
mod partition;
mod recover;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
@ -33,18 +26,9 @@ pub enum Error {
#[error("Error querying: {0}")]
Query(#[from] influxdb_iox_client::flight::Error),
#[error("Error in chunk subcommand: {0}")]
Chunk(#[from] chunk::Error),
#[error("Error in partition subcommand: {0}")]
Partition(#[from] partition::Error),
#[error("JSON Serialization error: {0}")]
Serde(#[from] serde_json::Error),
#[error("Error in partition subcommand: {0}")]
Catalog(#[from] recover::Error),
#[error("Client error: {0}")]
ClientError(#[from] influxdb_iox_client::error::Error),
}
@ -211,158 +195,15 @@ struct Restart {
/// All possible subcommands for database
#[derive(Debug, clap::Parser)]
enum Command {
/// Create a new database
Create(Create),
/// Get list of databases
List(List),
/// Return configuration of specific database
Get(Get),
/// Write data into the specified database
Write(Write),
/// Query the data with SQL
Query(Query),
/// Manage database chunks
Chunk(chunk::Config),
/// Manage database partitions
Partition(partition::Config),
/// Recover broken databases
Recover(recover::Config),
/// Release a database from its current server owner
Release(Release),
/// Claim an unowned database
Claim(Claim),
/// Shutdown a database
Shutdown(Shutdown),
/// Restart a database
Restart(Restart),
}
pub async fn command(connection: Connection, config: Config) -> Result<()> {
match config.command {
Command::Create(command) => {
let mut client = management::Client::new(connection);
#[allow(deprecated)]
let rules = DatabaseRules {
name: command.name.clone(),
lifecycle_rules: Some(LifecycleRules {
buffer_size_soft: command.buffer_size_soft as _,
buffer_size_hard: command.buffer_size_hard as _,
persist: command.persist,
immutable: command.immutable,
worker_backoff_millis: Default::default(),
max_active_compactions_cfg: Default::default(),
catalog_transactions_until_checkpoint: command
.catalog_transactions_until_checkpoint
.get(),
catalog_transaction_prune_age: Some(
command.catalog_transaction_prune_age.into(),
),
late_arrive_window_seconds: command.late_arrive_window_seconds,
persist_row_threshold: command.persist_row_threshold,
persist_age_threshold_seconds: command.persist_age_threshold_seconds,
mub_row_threshold: command.mub_row_threshold,
parquet_cache_limit: command.parquet_cache_limit,
}),
// Default to hourly partitions
partition_template: Some(PartitionTemplate {
parts: vec![partition_template::Part {
part: Some(partition_template::part::Part::Time(
"%Y-%m-%d %H:00:00".into(),
)),
}],
}),
// Note no write buffer config
..Default::default()
};
let uuid = client.create_database(rules).await?;
println!("Created database {}", command.name);
println!("{}", uuid);
}
Command::List(list) => {
let mut client = management::Client::new(connection);
if list.detailed {
let ServerStatus {
initialized,
error,
database_statuses,
} = client.get_server_status().await?;
if !initialized {
eprintln!("Can not list databases. Server is not yet initialized");
if let Some(err) = error {
println!("WARNING: Server is in error state: {}", err.message);
}
return Ok(());
}
if !database_statuses.is_empty() {
let mut table = Table::new();
table.load_preset(TABLE_STYLE_SINGLE_LINE_BORDERS);
table.set_header(vec![
Cell::new("Name"),
Cell::new("UUID"),
Cell::new("State"),
Cell::new("Error"),
]);
for database in database_statuses {
let uuid = if !database.uuid.is_empty() {
Uuid::from_slice(&database.uuid)
.map(|uuid| uuid.to_string())
.unwrap_or_else(|_| String::from("<UUID parsing failed>"))
} else {
String::from("<UUID not yet known>")
};
let state = DatabaseState::from_i32(database.state)
.map(|state| state.description())
.unwrap_or("UNKNOWN STATE");
let error = database
.error
.map(|e| e.message)
.unwrap_or_else(|| String::from("<none>"));
table.add_row(vec![
Cell::new(&database.db_name),
Cell::new(&uuid),
Cell::new(&state),
Cell::new(&error),
]);
}
println!("{}", table);
}
} else {
let names = client.list_database_names().await?;
if !names.is_empty() {
println!("{}", names.join("\n"))
}
}
}
Command::Get(get) => {
let Get {
name,
omit_defaults,
} = get;
let mut client = management::Client::new(connection);
let database = client.get_database(name, omit_defaults).await?;
println!("{}", serde_json::to_string_pretty(&database)?);
}
Command::Write(write) => {
let mut client = write::Client::new(connection);
@ -411,38 +252,6 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
println!("{}", formatted_result);
}
Command::Chunk(config) => {
chunk::command(connection, config).await?;
}
Command::Partition(config) => {
partition::command(connection, config).await?;
}
Command::Recover(config) => {
recover::command(connection, config).await?;
}
Command::Release(command) => {
let mut client = management::Client::new(connection);
let uuid = client.release_database(&command.name, command.uuid).await?;
println!("Released database {}", command.name);
println!("{}", uuid);
}
Command::Claim(command) => {
let mut client = management::Client::new(connection);
let db_name = client.claim_database(command.uuid, command.force).await?;
println!("Claimed database {}", db_name);
}
Command::Shutdown(command) => {
let mut client = management::Client::new(connection);
client.shutdown_database(command.name).await?;
println!("Database shutdown");
}
Command::Restart(command) => {
let mut client = management::Client::new(connection);
client
.restart_database(command.name, command.skip_replay)
.await?;
println!("Database restarted");
}
}
Ok(())

View File

@ -1,117 +0,0 @@
//! This module implements the `chunk` CLI command
use std::str::FromStr;
use data_types::chunk_metadata::{ChunkId, ChunkIdConversionError};
use generated_types::google::FieldViolation;
use influxdb_iox_client::{
connection::Connection,
management::{self, generated_types::Chunk},
};
use thiserror::Error;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
#[error("Error interpreting server response: {0}")]
ConvertingResponse(#[from] FieldViolation),
#[error("Error rendering response as JSON: {0}")]
WritingJson(#[from] serde_json::Error),
#[error("Error connecting to IOx: {0}")]
ConnectionError(#[from] influxdb_iox_client::connection::Error),
#[error("Chunk {value:?} not found")]
ChunkNotFound { value: String },
#[error("Invalid chunk ID: {0}")]
InvalidChunkIDError(#[from] ChunkIdConversionError),
#[error("Client error: {0}")]
ClientError(#[from] influxdb_iox_client::error::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Manage IOx chunks
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(subcommand)]
command: Command,
}
/// List the chunks for the specified database in JSON format
#[derive(Debug, clap::Parser)]
struct List {
/// The name of the database
db_name: String,
}
/// Loads the specified chunk in the specified database from the Object Store to the Read Buffer.
#[derive(Debug, clap::Parser)]
struct Load {
/// The name of the database
db_name: String,
/// The ID of the chunk
chunk_id: String,
}
/// All possible subcommands for chunk
#[derive(Debug, clap::Parser)]
enum Command {
List(List),
Load(Load),
}
pub async fn command(connection: Connection, config: Config) -> Result<()> {
match config.command {
Command::List(get) => {
let List { db_name } = get;
let mut client = management::Client::new(connection);
let chunks = client.list_chunks(db_name).await?;
serde_json::to_writer_pretty(std::io::stdout(), &chunks)?;
}
Command::Load(load) => {
let Load { db_name, chunk_id } = load;
let mut client = management::Client::new(connection);
let chunks = client.list_chunks(&db_name).await?;
let load_chunk_id = ChunkId::from_str(&chunk_id)?;
for chunk in chunks {
let id: ChunkId = chunk
.id
.clone()
.try_into()
.expect("catalog chunk IDs to be valid");
if id == load_chunk_id {
return load_chunk_to_read_buffer(&mut client, &db_name, chunk).await;
}
}
return Err(Error::ChunkNotFound {
value: load_chunk_id.to_string(),
});
}
}
Ok(())
}
async fn load_chunk_to_read_buffer(
client: &mut management::Client,
db_name: &str,
chunk: Chunk,
) -> Result<()> {
let operation = client
.load_partition_chunk(db_name, chunk.table_name, chunk.partition_key, chunk.id)
.await?;
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
Ok(())
}

View File

@ -1,440 +0,0 @@
//! This module implements the `partition` CLI command
use data_types::chunk_metadata::ChunkStorage;
use generated_types::google::FieldViolation;
use influxdb_iox_client::{
connection::Connection,
management::{self},
};
use std::collections::BTreeSet;
use thiserror::Error;
use uuid::Uuid;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
#[error("Client error: {0}")]
ClientError(#[from] influxdb_iox_client::error::Error),
#[error("Error rendering response as JSON: {0}")]
WritingJson(#[from] serde_json::Error),
#[error("Received invalid response: {0}")]
InvalidResponse(#[from] FieldViolation),
#[error("Must either specify --table-name or --all-tables")]
MissingTableName,
#[error("Must either specify a --partition-key or --all-partitions")]
MissingPartitionKey,
#[error("Some operations returned an error, but --continue-on-error passed")]
ContinuedOnError,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Manage IOx partitions
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(subcommand)]
command: Command,
}
/// List all known partition keys for a database
#[derive(Debug, clap::Parser)]
struct List {
/// The name of the database
db_name: String,
}
/// Get details of a specific partition in JSON format (TODO)
#[derive(Debug, clap::Parser)]
struct Get {
/// The name of the database
db_name: String,
/// The partition key
partition_key: String,
}
/// Persist partition.
///
/// Errors if there is nothing to persist at the moment as per the lifecycle rules. If successful it returns the
/// chunk that contains the persisted data.
#[derive(Debug, clap::Parser)]
struct Persist {
/// The name of the database
db_name: String,
/// The partition key
#[clap(long)]
partition_key: Option<String>,
/// The table name
#[clap(long)]
table_name: Option<String>,
/// Persist all data irrespective of arrival time
#[clap(long)]
force: bool,
/// Persist all tables that have data
#[clap(long)]
all_tables: bool,
/// Persist all partitions that have data
#[clap(long)]
all_partitions: bool,
/// Continue on error
#[clap(long)]
continue_on_error: bool,
}
/// Compact Object Store Chunks
///
/// Errors if the chunks are not yet compacted and not contiguous.
#[derive(Debug, clap::Parser)]
struct CompactObjectStoreChunks {
/// The name of the database
db_name: String,
/// The partition key
partition_key: String,
/// The table name
table_name: String,
/// The chunk ids
chunk_ids: Vec<Uuid>,
}
/// Compact all Object Store Chunks of a partition
#[derive(Debug, clap::Parser)]
struct CompactObjectStorePartition {
/// The name of the database
db_name: String,
/// The partition key
partition_key: String,
/// The table name
table_name: String,
}
/// lists all chunks in this partition
#[derive(Debug, clap::Parser)]
struct ListChunks {
/// The name of the database
db_name: String,
/// The partition key
partition_key: String,
}
/// Create a new, open chunk in the partiton's Mutable Buffer which will receive
/// new writes.
#[derive(Debug, clap::Parser)]
struct NewChunk {
/// The name of the database
db_name: String,
/// The partition key
partition_key: String,
/// The table name
table_name: String,
}
/// Closes a chunk in the mutable buffer for writing and starts its migration to
/// the read buffer
#[derive(Debug, clap::Parser)]
struct CloseChunk {
/// The name of the database
db_name: String,
/// The partition key
partition_key: String,
/// The table name
table_name: String,
/// The chunk id
chunk_id: Uuid,
}
/// Unload chunk from read buffer but keep it in object store.
#[derive(Debug, clap::Parser)]
struct UnloadChunk {
/// The name of the database
db_name: String,
/// The partition key
partition_key: String,
/// The table name
table_name: String,
/// The chunk id
chunk_id: Uuid,
}
/// Drop partition from memory and (if persisted) from object store.
#[derive(Debug, clap::Parser)]
struct DropPartition {
/// The name of the database
db_name: String,
/// The partition key
partition_key: String,
/// The table name
table_name: String,
}
/// All possible subcommands for partition
#[derive(Debug, clap::Parser)]
enum Command {
/// List partitions
List(List),
/// Get details about a particular partition
Get(Get),
/// Persist partition.
///
/// Errors if there is nothing to persist at the moment as per the lifecycle rules. If successful it returns the
/// chunk that contains the persisted data.
Persist(Persist),
/// Compact Object Store Chunks
///
/// Errors if the chunks are not yet compacted and not contiguous.
CompactObjectStoreChunks(CompactObjectStoreChunks),
/// Compact all object store chunks of a given partition
CompactObjectStorePartition(CompactObjectStorePartition),
/// Drop partition from memory and (if persisted) from object store.
Drop(DropPartition),
/// List chunks in a partition
ListChunks(ListChunks),
/// Create a new chunk in the partition
NewChunk(NewChunk),
/// Close the chunk and move to read buffer
CloseChunk(CloseChunk),
/// Unload chunk from read buffer but keep it in object store.
UnloadChunk(UnloadChunk),
}
pub async fn command(connection: Connection, config: Config) -> Result<()> {
let mut client = management::Client::new(connection);
match config.command {
Command::List(list) => {
let List { db_name } = list;
let partitions = client.list_partitions(db_name).await?;
let partition_keys = partitions.into_iter().map(|p| p.key).collect::<Vec<_>>();
serde_json::to_writer_pretty(std::io::stdout(), &partition_keys)?;
}
Command::Get(get) => {
let Get {
db_name,
partition_key,
} = get;
let management::generated_types::Partition { key, table_name } =
client.get_partition(db_name, partition_key).await?;
// TODO: get more details from the partition, and print it
// out better (i.e. move to using Partition summary that
// is already in data_types)
#[derive(serde::Serialize)]
struct PartitionDetail {
key: String,
table_name: String,
}
let partition_detail = PartitionDetail { key, table_name };
serde_json::to_writer_pretty(std::io::stdout(), &partition_detail)?;
}
Command::Persist(persist) => {
let Persist {
db_name,
partition_key,
table_name,
force,
all_tables,
all_partitions,
continue_on_error,
} = persist;
let mut has_error = false;
let partition_filter = match (partition_key, all_partitions) {
(Some(partition_key), false) => Some(partition_key),
(None, true) => None,
_ => return Err(Error::MissingPartitionKey),
};
let table_filter = match (table_name, all_tables) {
(Some(table_name), false) => Some(table_name),
(None, true) => None,
_ => return Err(Error::MissingTableName),
};
let mut partition_tables = BTreeSet::new();
let chunks = client.list_chunks(&db_name).await?;
for chunk in chunks {
let partition_mismatch =
matches!(&partition_filter, Some(x) if &chunk.partition_key != x);
let table_mismatch = matches!(&table_filter, Some(x) if &chunk.table_name != x);
let already_persisted = ChunkStorage::try_from(chunk.storage())?.has_object_store();
if !partition_mismatch && !table_mismatch && !already_persisted {
partition_tables.insert((chunk.partition_key, chunk.table_name));
}
}
for (partition, table_name) in partition_tables {
println!(
"Persisting partition: \"{}\", table: \"{}\"",
partition, table_name
);
let result = client
.persist_partition(&db_name, &table_name, &partition, force)
.await;
if let Err(e) = result {
if !continue_on_error {
return Err(e.into());
}
has_error = true;
eprintln!(
"Error persisting partition: \"{}\", table: \"{}\": {}",
partition, table_name, e
)
}
}
match has_error {
true => return Err(Error::ContinuedOnError),
false => println!("Ok"),
}
}
Command::CompactObjectStoreChunks(compact) => {
let CompactObjectStoreChunks {
db_name,
partition_key,
table_name,
chunk_ids,
} = compact;
let chunk_ids = chunk_ids
.iter()
.map(|chunk_id| chunk_id.as_bytes().to_vec().into())
.collect();
let operation = client
.compact_object_store_chunks(db_name, table_name, partition_key, chunk_ids)
.await?;
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
}
Command::CompactObjectStorePartition(compact) => {
let CompactObjectStorePartition {
db_name,
partition_key,
table_name,
} = compact;
let operation = client
.compact_object_store_partition(db_name, table_name, partition_key)
.await?;
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
}
Command::Drop(drop_partition) => {
let DropPartition {
db_name,
partition_key,
table_name,
} = drop_partition;
client
.drop_partition(db_name, table_name, partition_key)
.await?;
println!("Ok");
}
Command::ListChunks(list_chunks) => {
let ListChunks {
db_name,
partition_key,
} = list_chunks;
let chunks = client.list_partition_chunks(db_name, partition_key).await?;
serde_json::to_writer_pretty(std::io::stdout(), &chunks)?;
}
Command::NewChunk(new_chunk) => {
let NewChunk {
db_name,
partition_key,
table_name,
} = new_chunk;
// Ignore response for now
client
.new_partition_chunk(db_name, table_name, partition_key)
.await?;
println!("Ok");
}
Command::CloseChunk(close_chunk) => {
let CloseChunk {
db_name,
partition_key,
table_name,
chunk_id,
} = close_chunk;
let operation = client
.close_partition_chunk(
db_name,
table_name,
partition_key,
chunk_id.as_bytes().to_vec().into(),
)
.await?;
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
}
Command::UnloadChunk(close_chunk) => {
let UnloadChunk {
db_name,
partition_key,
table_name,
chunk_id,
} = close_chunk;
client
.unload_partition_chunk(
db_name,
table_name,
partition_key,
chunk_id.as_bytes().to_vec().into(),
)
.await?;
println!("Ok");
}
}
Ok(())
}

View File

@ -1,121 +0,0 @@
use generated_types::google::FieldViolation;
use influxdb_iox_client::{connection::Connection, management};
use snafu::{ResultExt, Snafu};
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Need to pass `--force`"))]
NeedsTheForceError,
#[snafu(display("Error wiping preserved catalog: {}", source))]
WipeError {
source: influxdb_iox_client::error::Error,
},
#[snafu(display("Error skipping replay: {}", source))]
SkipReplayError {
source: influxdb_iox_client::error::Error,
},
#[snafu(display("Error rebuilding preserved catalog: {}", source))]
RebuildCatalog {
source: influxdb_iox_client::error::Error,
},
#[snafu(display("Received invalid response: {}", source))]
InvalidResponse { source: FieldViolation },
#[snafu(display("Error rendering response as JSON: {}", source))]
WritingJson { source: serde_json::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Recover broken databases.
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(subcommand)]
command: Command,
}
/// All possible subcommands for recovering broken databases
#[derive(Debug, clap::Parser)]
enum Command {
/// Wipe preserved catalog
Wipe(Wipe),
/// Skip replay
SkipReplay(SkipReplay),
/// Rebuild catalog from parquet fles
Rebuild(Rebuild),
}
/// Wipe preserved catalog.
#[derive(Debug, clap::Parser)]
struct Wipe {
/// Force wipe. Required option to prevent accidental erasure
#[clap(long)]
force: bool,
/// The name of the database
db_name: String,
}
/// Rebuild catalog from parquet files
#[derive(Debug, clap::Parser)]
struct Rebuild {
/// Force rebuild, even if the database has already successfully started
#[clap(long)]
force: bool,
/// The name of the database
db_name: String,
}
/// Skip replay
#[derive(Debug, clap::Parser)]
struct SkipReplay {
/// The name of the database
db_name: String,
}
pub async fn command(connection: Connection, config: Config) -> Result<()> {
let mut client = management::Client::new(connection);
match config.command {
Command::Wipe(wipe) => {
let Wipe { force, db_name } = wipe;
if !force {
return Err(Error::NeedsTheForceError);
}
let operation = client
.wipe_preserved_catalog(db_name)
.await
.context(WipeSnafu)?;
serde_json::to_writer_pretty(std::io::stdout(), &operation)
.context(WritingJsonSnafu)?;
}
Command::SkipReplay(skip_replay) => {
let SkipReplay { db_name } = skip_replay;
client.skip_replay(db_name).await.context(SkipReplaySnafu)?;
println!("Ok");
}
Command::Rebuild(rebuild) => {
let operation = client
.rebuild_preserved_catalog(rebuild.db_name, rebuild.force)
.await
.context(RebuildCatalogSnafu)?;
serde_json::to_writer_pretty(std::io::stdout(), &operation)
.context(WritingJsonSnafu)?;
}
}
Ok(())
}

View File

@ -1,82 +0,0 @@
use influxdb_iox_client::{connection::Connection, management, operations::Client};
use thiserror::Error;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
#[error("Client error: {0}")]
ClientError(#[from] influxdb_iox_client::error::Error),
#[error("Output serialization error: {0}")]
SerializationError(#[from] serde_json::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Manage long-running IOx operations
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(subcommand)]
command: Command,
}
#[derive(Debug, clap::Parser)]
enum Command {
/// Get list of running operations
List,
/// Get a specific operation
Get {
/// The id of the operation
id: usize,
},
/// Wait for a specific operation to complete
Wait {
/// The id of the operation
id: usize,
/// Maximum number of nanoseconds to wait before returning current
/// status
nanos: Option<u64>,
},
/// Cancel a specific operation
Cancel {
/// The id of the operation
id: usize,
},
/// Spawns a dummy test operation
Test { nanos: Vec<u64> },
}
pub async fn command(connection: Connection, config: Config) -> Result<()> {
match config.command {
Command::List => {
let operations = Client::new(connection).list_operations().await?;
serde_json::to_writer_pretty(std::io::stdout(), &operations)?;
}
Command::Get { id } => {
let operation = Client::new(connection).get_operation(id).await?;
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
}
Command::Wait { id, nanos } => {
let timeout = nanos.map(std::time::Duration::from_nanos);
let operation = Client::new(connection).wait_operation(id, timeout).await?;
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
}
Command::Cancel { id } => {
Client::new(connection).cancel_operation(id).await?;
println!("Ok");
}
Command::Test { nanos } => {
let operation = management::Client::new(connection)
.create_dummy_job(nanos)
.await?;
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
}
}
Ok(())
}

View File

@ -1,98 +0,0 @@
//! This module implements the `router` CLI command
use influxdb_iox_client::{
connection::Connection,
router::{self, generated_types::Router as RouterConfig},
};
use thiserror::Error;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
#[error("JSON Serialization error: {0}")]
Serde(#[from] serde_json::Error),
#[error("Client error: {0}")]
ClientError(#[from] influxdb_iox_client::error::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Manage IOx databases
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(subcommand)]
command: Command,
}
/// Create a new router or update an existing one.
#[derive(Debug, clap::Parser)]
struct CreateOrUpdate {
/// The name of the router
name: String,
}
/// Return configuration of specific router
#[derive(Debug, clap::Parser)]
struct Get {
/// The name of the router
name: String,
}
/// Delete specific router
#[derive(Debug, clap::Parser)]
struct Delete {
/// The name of the router
name: String,
}
/// All possible subcommands for router
#[derive(Debug, clap::Parser)]
enum Command {
CreateOrUpdate(CreateOrUpdate),
/// List routers
List,
Get(Get),
Delete(Delete),
}
pub async fn command(connection: Connection, config: Config) -> Result<()> {
match config.command {
Command::CreateOrUpdate(command) => {
let mut client = router::Client::new(connection);
let config = RouterConfig {
name: command.name.clone(),
..Default::default()
};
client.update_router(config).await?;
println!("Created/Updated router {}", command.name);
}
Command::List => {
let mut client = router::Client::new(connection);
let routers = client.list_routers().await?;
for router in routers {
println!("{}", router.name);
}
}
Command::Get(get) => {
let Get { name } = get;
let mut client = router::Client::new(connection);
let router = client.get_router(&name).await?;
println!("{}", serde_json::to_string_pretty(&router)?);
}
Command::Delete(delete) => {
let Delete { name } = delete;
let mut client = router::Client::new(connection);
client.delete_router(&name).await?;
println!("Deleted router {}", name);
}
}
Ok(())
}

View File

@ -1,130 +0,0 @@
//! Implementation of command line option for manipulating and showing server
//! config
use std::{
num::NonZeroU32,
time::{Duration, Instant},
};
use crate::commands::server_remote;
use generated_types::database_state::DatabaseState;
use influxdb_iox_client::{connection::Connection, deployment, management};
use thiserror::Error;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
#[error("Remote: {0}")]
RemoteError(#[from] server_remote::Error),
#[error("Request error: {0}")]
Request(#[from] influxdb_iox_client::error::Error),
#[error("Timeout waiting for databases to be loaded")]
TimeoutDatabasesLoaded,
#[error("Server startup error: {0}")]
Server(String),
#[error("Database startup error for \"{0}\": {1}")]
Database(String, String),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, clap::Parser)]
#[clap(name = "server", about = "IOx server commands")]
pub struct Config {
#[clap(subcommand)]
command: Command,
}
#[derive(Debug, clap::Parser)]
enum Command {
/// Set server ID
Set(Set),
/// Get server ID
Get,
/// Wait until server is initialized.
WaitServerInitialized(WaitSeverInitialized),
Remote(Remote),
}
/// Set server ID
#[derive(Debug, clap::Parser)]
struct Set {
/// The server ID to set
id: NonZeroU32,
}
/// Wait until server and all databases are initialized
#[derive(Debug, clap::Parser)]
struct WaitSeverInitialized {
/// Timeout in seconds.
#[clap(short, default_value = "10")]
timeout: u64,
}
#[derive(Debug, clap::Parser)]
struct Remote {
#[clap(subcommand)]
config: server_remote::Config,
}
pub async fn command(connection: Connection, config: Config) -> Result<()> {
match config.command {
Command::Set(command) => {
let mut client = deployment::Client::new(connection);
client.update_server_id(command.id).await?;
println!("Ok");
Ok(())
}
Command::Get => {
let mut client = deployment::Client::new(connection);
match client.get_server_id().await? {
Some(id) => println!("{}", id.get()),
None => println!("NONE"),
}
Ok(())
}
Command::WaitServerInitialized(command) => {
let mut client = management::Client::new(connection);
let end = Instant::now() + Duration::from_secs(command.timeout);
loop {
let server_status = client.get_server_status().await?;
if let Some(err) = server_status.error {
return Err(Error::Server(err.message));
}
if server_status.initialized {
let mut initialized = true;
for db_status in server_status.database_statuses {
if let Some(err) = db_status.error {
return Err(Error::Database(db_status.db_name, err.message));
}
if db_status.state() != DatabaseState::Initialized {
initialized = false;
break;
}
}
if initialized {
println!("Server initialized.");
return Ok(());
}
}
if Instant::now() >= end {
eprintln!("timeout");
return Err(Error::TimeoutDatabasesLoaded);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
Command::Remote(config) => Ok(server_remote::command(connection, config.config).await?),
}
}

View File

@ -1,67 +0,0 @@
use crate::TABLE_STYLE_SINGLE_LINE_BORDERS;
use comfy_table::{Cell, Table};
use influxdb_iox_client::{connection::Connection, remote};
use thiserror::Error;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
#[error("Client error: {0}")]
ClientError(#[from] influxdb_iox_client::error::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, clap::Parser)]
#[clap(
name = "remote",
about = "Manage configuration about other IOx servers"
)]
pub enum Config {
/// Set connection parameters for a remote IOx server.
Set { id: u32, connection_string: String },
/// Remove a reference to a remote IOx server.
Remove { id: u32 },
/// List configured remote IOx server.
List,
}
pub async fn command(connection: Connection, config: Config) -> Result<()> {
match config {
Config::Set {
id,
connection_string,
} => {
let mut client = remote::Client::new(connection);
client.update_remote(id, connection_string).await?;
}
Config::Remove { id } => {
let mut client = remote::Client::new(connection);
client.delete_remote(id).await?;
}
Config::List => {
let mut client = remote::Client::new(connection);
let remotes = client.list_remotes().await?;
if remotes.is_empty() {
println!("no remotes configured");
} else {
let mut table = Table::new();
table.load_preset(TABLE_STYLE_SINGLE_LINE_BORDERS);
table.set_header(vec![Cell::new("ID"), Cell::new("Connection string")]);
for i in remotes {
table.add_row(vec![
Cell::new(&format!("{}", i.id)),
Cell::new(&i.connection_string),
]);
}
print!("{}", table);
}
}
};
Ok(())
}

View File

@ -27,13 +27,9 @@ mod commands {
pub mod catalog;
pub mod database;
pub mod debug;
pub mod operations;
pub mod remote;
pub mod router;
pub mod run;
pub mod schema;
pub mod server;
pub mod server_remote;
pub mod sql;
pub mod storage;
pub mod tracing;
@ -54,19 +50,6 @@ static VERSION_STRING: Lazy<String> = Lazy::new(|| {
)
});
/// A comfy_table style that uses single ASCII lines for all borders with plusses at intersections.
///
/// Example:
///
/// ```
/// +------+--------------------------------------+
/// | Name | UUID |
/// +------+--------------------------------------+
/// | bar | ccc2b8bc-f25d-4341-9b64-b9cfe50d26de |
/// | foo | 3317ff2b-bbab-43ae-8c63-f0e9ea2f3bdb |
/// +------+--------------------------------------+
const TABLE_STYLE_SINGLE_LINE_BORDERS: &str = "||--+-++| ++++++";
#[cfg(all(
feature = "heappy",
feature = "jemalloc_replacing_malloc",
@ -173,18 +156,9 @@ enum Command {
/// Commands to run against remote IOx APIs
Remote(commands::remote::Config),
/// Router-related commands
Router(commands::router::Config),
/// IOx schema configuration commands
Schema(commands::schema::Config),
/// IOx server configuration commands
Server(commands::server::Config),
/// Manage long-running IOx operations
Operation(commands::operations::Config),
/// Start IOx interactive SQL REPL loop
Sql(commands::sql::Config),
@ -261,22 +235,6 @@ fn main() -> Result<(), std::io::Error> {
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Operation(config) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
let connection = connection().await;
if let Err(e) = commands::operations::command(connection, config).await {
eprintln!("{}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Server(config) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
let connection = connection().await;
if let Err(e) = commands::server::command(connection, config).await {
eprintln!("Server command failed: {}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Remote(config) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
let connection = connection().await;
@ -285,14 +243,6 @@ fn main() -> Result<(), std::io::Error> {
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Router(config) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
let connection = connection().await;
if let Err(e) = commands::router::command(connection, config).await {
eprintln!("{}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Run(config) => {
let _tracing_guard =
handle_init_logs(init_logs_and_tracing(log_verbose_count, &config));

View File

@ -7,30 +7,15 @@ pub mod health;
/// Client for delete API
pub mod delete;
/// Client for deployment API
pub mod deployment;
/// Client for management API
pub mod management;
/// Client for namespace API
pub mod namespace;
/// Client for remote API
pub mod remote;
/// Client for router API
pub mod router;
/// Client for schema API
pub mod schema;
/// Client for write API
pub mod write;
/// Client for long running operations API
pub mod operations;
#[cfg(feature = "flight")]
/// Client for query API (based on Arrow flight)
pub mod flight;

View File

@ -1,67 +0,0 @@
use self::generated_types::{deployment_service_client::DeploymentServiceClient, *};
use crate::connection::Connection;
use crate::error::Error;
use std::{convert::TryInto, num::NonZeroU32};
/// Re-export generated_types
pub mod generated_types {
pub use generated_types::influxdata::iox::deployment::v1::*;
}
/// An IOx Deployment API client.
///
/// This client wraps the underlying `tonic` generated client with a
/// more ergonomic interface.
///
/// ```no_run
/// #[tokio::main]
/// # async fn main() {
/// use std::num::NonZeroU32;
/// use influxdb_iox_client::{
/// deployment::Client,
/// connection::Builder,
/// };
///
/// let mut connection = Builder::default()
/// .build("http://127.0.0.1:8082")
/// .await
/// .unwrap();
///
/// let mut client = Client::new(connection);
///
/// // Update server ID.
/// let server_id = NonZeroU32::new(42).unwrap();
/// client
/// .update_server_id(server_id)
/// .await
/// .expect("could not update server ID");
/// # }
/// ```
#[derive(Debug, Clone)]
pub struct Client {
inner: DeploymentServiceClient<Connection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
Self {
inner: DeploymentServiceClient::new(channel),
}
}
/// Set the server's ID.
pub async fn update_server_id(&mut self, id: NonZeroU32) -> Result<(), Error> {
self.inner
.update_server_id(UpdateServerIdRequest { id: id.get() })
.await?;
Ok(())
}
/// Get the server's ID.
pub async fn get_server_id(&mut self) -> Result<Option<NonZeroU32>, Error> {
let response = self.inner.get_server_id(GetServerIdRequest {}).await?;
let maybe_id = response.get_ref().id.try_into().ok();
Ok(maybe_id)
}
}

View File

@ -43,11 +43,6 @@ impl Client {
}
}
/// Returns `Ok(true)` if the deployment service is serving
pub async fn check_deployment(&mut self) -> Result<bool, Error> {
self.check(generated_types::DEPLOYMENT_SERVICE).await
}
/// Returns `Ok(true)` if the storage service is serving
pub async fn check_storage(&mut self) -> Result<bool, Error> {
self.check(generated_types::STORAGE_SERVICE).await

View File

@ -1,588 +0,0 @@
use self::generated_types::{management_service_client::ManagementServiceClient, *};
use crate::{
connection::Connection,
error::Error,
google::{longrunning::IoxOperation, OptionalField},
};
use bytes::Bytes;
use std::convert::TryInto;
use uuid::Uuid;
/// Re-export generated_types
pub mod generated_types {
pub use generated_types::influxdata::iox::management::v1::*;
pub use generated_types::influxdata::iox::write_buffer::v1::*;
}
/// An IOx Management API client.
///
/// This client wraps the underlying `tonic` generated client with a
/// more ergonomic interface.
///
/// ```no_run
/// #[tokio::main]
/// # async fn main() {
/// use influxdb_iox_client::{
/// management::{Client, generated_types::DatabaseRules},
/// connection::Builder,
/// };
///
/// let mut connection = Builder::default()
/// .build("http://127.0.0.1:8082")
/// .await
/// .unwrap();
///
/// let mut client = Client::new(connection);
///
/// // Create a new database!
/// client
/// .create_database(DatabaseRules{
/// name: "bananas".to_string(),
/// ..Default::default()
/// })
/// .await
/// .expect("failed to create database");
/// # }
/// ```
#[derive(Debug, Clone)]
pub struct Client {
inner: ManagementServiceClient<Connection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
Self {
inner: ManagementServiceClient::new(channel),
}
}
/// Check if databases are loaded and ready for read and write.
pub async fn get_server_status(&mut self) -> Result<ServerStatus, Error> {
let response = self
.inner
.get_server_status(GetServerStatusRequest {})
.await?;
Ok(response
.into_inner()
.server_status
.unwrap_field("server_status")?)
}
/// Creates a new IOx database.
pub async fn create_database(&mut self, rules: DatabaseRules) -> Result<Uuid, Error> {
let response = self
.inner
.create_database(CreateDatabaseRequest { rules: Some(rules) })
.await?;
let server_uuid = response.into_inner().uuid;
let uuid = Uuid::from_slice(&server_uuid)
.map_err(|e| {
format!(
"Could not create UUID from server value {:?}: {}",
server_uuid, e
)
})
.unwrap();
Ok(uuid)
}
/// Updates the configuration for a database.
pub async fn update_database(&mut self, rules: DatabaseRules) -> Result<DatabaseRules, Error> {
let response = self
.inner
.update_database(UpdateDatabaseRequest { rules: Some(rules) })
.await?;
Ok(response.into_inner().rules.unwrap_field("rules")?)
}
/// List databases.
///
/// See [`Self::get_database`] for the semanitcs of `omit_defaults`
pub async fn list_databases(
&mut self,
omit_defaults: bool,
) -> Result<Vec<DatabaseRules>, Error> {
let response = self
.inner
.list_databases(ListDatabasesRequest { omit_defaults })
.await?;
Ok(response.into_inner().rules)
}
/// List databases names
pub async fn list_database_names(&mut self) -> Result<Vec<String>, Error> {
// doesn't really matter as the name is present in all forms
// of the config. Pick true to minimize bandwidth.
let omit_defaults = true;
let databases = self.list_databases(omit_defaults).await?;
let names = databases
.iter()
.map(|rules| rules.name.to_string())
.collect::<Vec<_>>();
Ok(names)
}
/// Get database configuration
///
/// If `omit_defaults` is false, return the current configuration
/// that is being used by the server, with all default values
/// filled in.
///
/// If `omit_defaults` is true, returns only the persisted configuration (aka only
/// fields which were was supplied when the database was created
/// or last modified via UpdateDatabase)
pub async fn get_database(
&mut self,
name: impl Into<String> + Send,
omit_defaults: bool,
) -> Result<DatabaseRules, Error> {
let response = self
.inner
.get_database(GetDatabaseRequest {
name: name.into(),
omit_defaults,
})
.await?;
Ok(response.into_inner().rules.unwrap_field("rules")?)
}
/// Release database
pub async fn release_database(
&mut self,
db_name: impl Into<String> + Send,
uuid: Option<Uuid>,
) -> Result<Uuid, Error> {
let db_name = db_name.into();
let response = self
.inner
.release_database(ReleaseDatabaseRequest {
db_name: db_name.clone(),
uuid: uuid.map(|u| u.as_bytes().to_vec()).unwrap_or_default(),
})
.await?;
let server_uuid = response.into_inner().uuid;
let uuid = Uuid::from_slice(&server_uuid)
.map_err(|e| {
format!(
"Could not create UUID from server value {:?}: {}",
server_uuid, e
)
})
.unwrap();
Ok(uuid)
}
/// Claim database
///
/// if `force` is true, forces the server to claim this database, even if it is
/// ostensibly owned by another server.
///
/// WARNING: If another server is currently writing to this
/// database, corruption will very likely occur.
pub async fn claim_database(&mut self, uuid: Uuid, force: bool) -> Result<String, Error> {
let uuid_bytes = uuid.as_bytes().to_vec();
let response = self
.inner
.claim_database(ClaimDatabaseRequest {
uuid: uuid_bytes,
force,
})
.await?;
Ok(response.into_inner().db_name)
}
/// List chunks in a database.
pub async fn list_chunks(
&mut self,
db_name: impl Into<String> + Send,
) -> Result<Vec<Chunk>, Error> {
let db_name = db_name.into();
let response = self
.inner
.list_chunks(ListChunksRequest { db_name })
.await?;
Ok(response.into_inner().chunks)
}
/// List all partitions of the database
pub async fn list_partitions(
&mut self,
db_name: impl Into<String> + Send,
) -> Result<Vec<Partition>, Error> {
let db_name = db_name.into();
let response = self
.inner
.list_partitions(ListPartitionsRequest { db_name })
.await?;
Ok(response.into_inner().partitions)
}
/// Get details about a specific partition
pub async fn get_partition(
&mut self,
db_name: impl Into<String> + Send,
partition_key: impl Into<String> + Send,
) -> Result<Partition, Error> {
let db_name = db_name.into();
let partition_key = partition_key.into();
let response = self
.inner
.get_partition(GetPartitionRequest {
db_name,
partition_key,
})
.await?;
Ok(response.into_inner().partition.unwrap_field("partition")?)
}
/// List chunks in a partition
pub async fn list_partition_chunks(
&mut self,
db_name: impl Into<String> + Send,
partition_key: impl Into<String> + Send,
) -> Result<Vec<Chunk>, Error> {
let db_name = db_name.into();
let partition_key = partition_key.into();
let response = self
.inner
.list_partition_chunks(ListPartitionChunksRequest {
db_name,
partition_key,
})
.await?;
Ok(response.into_inner().chunks)
}
/// Create a new chunk in a partition
pub async fn new_partition_chunk(
&mut self,
db_name: impl Into<String> + Send,
table_name: impl Into<String> + Send,
partition_key: impl Into<String> + Send,
) -> Result<(), Error> {
let db_name = db_name.into();
let partition_key = partition_key.into();
let table_name = table_name.into();
self.inner
.new_partition_chunk(NewPartitionChunkRequest {
db_name,
partition_key,
table_name,
})
.await?;
Ok(())
}
/// Creates a dummy job that for each value of the nanos field
/// spawns a task that sleeps for that number of nanoseconds before
/// returning
pub async fn create_dummy_job(&mut self, nanos: Vec<u64>) -> Result<IoxOperation, Error> {
let response = self
.inner
.create_dummy_job(CreateDummyJobRequest { nanos })
.await?;
Ok(response
.into_inner()
.operation
.unwrap_field("operation")?
.try_into()?)
}
/// Closes the specified chunk in the specified partition and
/// begins it moving to the read buffer.
///
/// Returns the job tracking the data's movement
pub async fn close_partition_chunk(
&mut self,
db_name: impl Into<String> + Send,
table_name: impl Into<String> + Send,
partition_key: impl Into<String> + Send,
chunk_id: Bytes,
) -> Result<IoxOperation, Error> {
let db_name = db_name.into();
let partition_key = partition_key.into();
let table_name = table_name.into();
let response = self
.inner
.close_partition_chunk(ClosePartitionChunkRequest {
db_name,
partition_key,
table_name,
chunk_id,
})
.await?;
Ok(response
.into_inner()
.operation
.unwrap_field("operation")?
.try_into()?)
}
/// Unload chunk from read buffer but keep it in object store.
pub async fn unload_partition_chunk(
&mut self,
db_name: impl Into<String> + Send,
table_name: impl Into<String> + Send,
partition_key: impl Into<String> + Send,
chunk_id: Bytes,
) -> Result<(), Error> {
let db_name = db_name.into();
let partition_key = partition_key.into();
let table_name = table_name.into();
self.inner
.unload_partition_chunk(UnloadPartitionChunkRequest {
db_name,
partition_key,
table_name,
chunk_id,
})
.await?;
Ok(())
}
/// Load a chunk from the object store into the Read Buffer.
pub async fn load_partition_chunk(
&mut self,
db_name: impl Into<String> + Send,
table_name: impl Into<String> + Send,
partition_key: impl Into<String> + Send,
chunk_id: Bytes,
) -> Result<IoxOperation, Error> {
let db_name = db_name.into();
let partition_key = partition_key.into();
let table_name = table_name.into();
let response = self
.inner
.load_partition_chunk(LoadPartitionChunkRequest {
db_name,
partition_key,
table_name,
chunk_id,
})
.await?;
Ok(response
.into_inner()
.operation
.unwrap_field("operation")?
.try_into()?)
}
/// Wipe potential preserved catalog of an uninitialized database.
pub async fn wipe_preserved_catalog(
&mut self,
db_name: impl Into<String> + Send,
) -> Result<IoxOperation, Error> {
let db_name = db_name.into();
let response = self
.inner
.wipe_preserved_catalog(WipePreservedCatalogRequest { db_name })
.await?;
Ok(response
.into_inner()
.operation
.unwrap_field("operation")?
.try_into()?)
}
/// Rebuild preserved catalog of an uninitialized database
pub async fn rebuild_preserved_catalog(
&mut self,
db_name: impl Into<String> + Send,
force: bool,
) -> Result<IoxOperation, Error> {
let db_name = db_name.into();
let response = self
.inner
.rebuild_preserved_catalog(RebuildPreservedCatalogRequest { db_name, force })
.await?;
Ok(response
.into_inner()
.operation
.unwrap_field("operation")?
.try_into()?)
}
/// Skip replay of an uninitialized database.
pub async fn skip_replay(&mut self, db_name: impl Into<String> + Send) -> Result<(), Error> {
let db_name = db_name.into();
self.inner
.skip_replay(SkipReplayRequest { db_name })
.await?;
Ok(())
}
/// Drop partition from memory and (if persisted) from object store.
pub async fn drop_partition(
&mut self,
db_name: impl Into<String> + Send,
table_name: impl Into<String> + Send,
partition_key: impl Into<String> + Send,
) -> Result<(), Error> {
let db_name = db_name.into();
let partition_key = partition_key.into();
let table_name = table_name.into();
self.inner
.drop_partition(DropPartitionRequest {
db_name,
partition_key,
table_name,
})
.await?;
Ok(())
}
/// Persist given partition.
///
/// Errors if there is nothing to persist at the moment as per the lifecycle rules. If successful it returns the
/// chunk that contains the persisted data.
pub async fn persist_partition(
&mut self,
db_name: impl Into<String> + Send,
table_name: impl Into<String> + Send,
partition_key: impl Into<String> + Send,
force: bool,
) -> Result<(), Error> {
let db_name = db_name.into();
let partition_key = partition_key.into();
let table_name = table_name.into();
self.inner
.persist_partition(PersistPartitionRequest {
db_name,
partition_key,
table_name,
force,
})
.await?;
Ok(())
}
/// Compact given object store chunks (db, table, partition, chunks)
///
/// Error if the chunks are not yet compacted and not contiguous
pub async fn compact_object_store_chunks(
&mut self,
db_name: impl Into<String> + Send,
table_name: impl Into<String> + Send,
partition_key: impl Into<String> + Send,
chunk_ids: Vec<Bytes>,
) -> Result<IoxOperation, Error> {
let db_name = db_name.into();
let partition_key = partition_key.into();
let table_name = table_name.into();
let response = self
.inner
.compact_object_store_chunks(CompactObjectStoreChunksRequest {
db_name,
partition_key,
table_name,
chunk_ids,
})
.await?;
Ok(response
.into_inner()
.operation
.unwrap_field("operation")?
.try_into()?)
}
/// Compact all object store of a give partition
pub async fn compact_object_store_partition(
&mut self,
db_name: impl Into<String> + Send,
table_name: impl Into<String> + Send,
partition_key: impl Into<String> + Send,
) -> Result<IoxOperation, Error> {
let db_name = db_name.into();
let partition_key = partition_key.into();
let table_name = table_name.into();
let response = self
.inner
.compact_object_store_partition(CompactObjectStorePartitionRequest {
db_name,
partition_key,
table_name,
})
.await?;
Ok(response
.into_inner()
.operation
.unwrap_field("operation")?
.try_into()?)
}
/// Shutdown database
pub async fn shutdown_database(
&mut self,
db_name: impl Into<String> + Send,
) -> Result<(), Error> {
let db_name = db_name.into();
self.inner
.shutdown_database(ShutdownDatabaseRequest { db_name })
.await?;
Ok(())
}
/// Restart database
pub async fn restart_database(
&mut self,
db_name: impl Into<String> + Send,
skip_replay: bool,
) -> Result<(), Error> {
let db_name = db_name.into();
self.inner
.restart_database(RestartDatabaseRequest {
db_name,
skip_replay,
})
.await?;
Ok(())
}
}

View File

@ -1,95 +0,0 @@
use self::generated_types::{operations_client::OperationsClient, *};
use crate::connection::Connection;
use crate::error::Error;
use std::convert::TryInto;
/// Re-export generated_types
pub mod generated_types {
pub use generated_types::google::longrunning::*;
}
/// An IOx Long Running Operations API client.
///
/// ```no_run
/// #[tokio::main]
/// # async fn main() {
/// use influxdb_iox_client::{
/// operations::Client,
/// connection::Builder,
/// };
///
/// let mut connection = Builder::default()
/// .build("http://127.0.0.1:8082")
/// .await
/// .unwrap();
///
/// let mut client = Client::new(connection);
/// # }
/// ```
#[derive(Debug, Clone)]
pub struct Client {
inner: OperationsClient<Connection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
Self {
inner: OperationsClient::new(channel),
}
}
/// Get information of all client operation
pub async fn list_operations(&mut self) -> Result<Vec<IoxOperation>, Error> {
self.inner
.list_operations(ListOperationsRequest::default())
.await?
.into_inner()
.operations
.into_iter()
.map(TryInto::try_into)
.collect::<Result<_, _>>()
.map_err(Error::InvalidResponse)
}
/// Get information about a specific operation
pub async fn get_operation(&mut self, id: usize) -> Result<IoxOperation, Error> {
self.inner
.get_operation(GetOperationRequest {
name: id.to_string(),
})
.await?
.into_inner()
.try_into()
.map_err(Error::InvalidResponse)
}
/// Cancel a given operation
pub async fn cancel_operation(&mut self, id: usize) -> Result<(), Error> {
self.inner
.cancel_operation(CancelOperationRequest {
name: id.to_string(),
})
.await?;
Ok(())
}
/// Waits until an operation completes, or the timeout expires, and
/// returns the latest operation metadata
pub async fn wait_operation(
&mut self,
id: usize,
timeout: Option<std::time::Duration>,
) -> Result<IoxOperation, Error> {
self.inner
.wait_operation(WaitOperationRequest {
name: id.to_string(),
timeout: timeout.map(Into::into),
})
.await?
.into_inner()
.try_into()
.map_err(Error::InvalidResponse)
}
}

View File

@ -1,79 +0,0 @@
use self::generated_types::{remote_service_client::RemoteServiceClient, *};
use crate::connection::Connection;
use crate::error::Error;
/// Re-export generated_types
pub mod generated_types {
pub use generated_types::influxdata::iox::remote::v1::*;
}
/// An IOx Remote API client.
///
/// This client wraps the underlying `tonic` generated client with a
/// more ergonomic interface.
///
/// ```no_run
/// #[tokio::main]
/// # async fn main() {
/// use influxdb_iox_client::{
/// remote::Client,
/// connection::Builder,
/// };
///
/// let mut connection = Builder::default()
/// .build("http://127.0.0.1:8082")
/// .await
/// .unwrap();
///
/// let mut client = Client::new(connection);
///
/// // Create a new database!
/// client
/// .list_remotes()
/// .await
/// .expect("listing remotes failed");
/// # }
/// ```
#[derive(Debug, Clone)]
pub struct Client {
inner: RemoteServiceClient<Connection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
Self {
inner: RemoteServiceClient::new(channel),
}
}
/// List remotes.
pub async fn list_remotes(&mut self) -> Result<Vec<generated_types::Remote>, Error> {
let response = self.inner.list_remotes(ListRemotesRequest {}).await?;
Ok(response.into_inner().remotes)
}
/// Update remote
pub async fn update_remote(
&mut self,
id: u32,
connection_string: impl Into<String> + Send,
) -> Result<(), Error> {
self.inner
.update_remote(UpdateRemoteRequest {
remote: Some(generated_types::Remote {
id,
connection_string: connection_string.into(),
}),
})
.await?;
Ok(())
}
/// Delete remote
pub async fn delete_remote(&mut self, id: u32) -> Result<(), Error> {
self.inner.delete_remote(DeleteRemoteRequest { id }).await?;
Ok(())
}
}

View File

@ -1,96 +0,0 @@
use ::generated_types::google::OptionalField;
use self::generated_types::{router_service_client::RouterServiceClient, *};
use crate::connection::Connection;
use crate::error::Error;
/// Re-export generated_types
pub mod generated_types {
pub use generated_types::influxdata::iox::router::v1::*;
pub use generated_types::influxdata::iox::write_buffer::v1::*;
}
/// An IOx Router API client.
///
/// This client wraps the underlying `tonic` generated client with a
/// more ergonomic interface.
///
/// ```no_run
/// #[tokio::main]
/// # async fn main() {
/// use influxdb_iox_client::{
/// router::Client,
/// connection::Builder,
/// };
///
/// let mut connection = Builder::default()
/// .build("http://127.0.0.1:8082")
/// .await
/// .unwrap();
///
/// let mut client = Client::new(connection);
///
/// // List routers
/// client
/// .list_routers()
/// .await
/// .expect("listing routers failed");
/// # }
/// ```
#[derive(Debug, Clone)]
pub struct Client {
inner: RouterServiceClient<Connection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
Self {
inner: RouterServiceClient::new(channel),
}
}
/// Get router
pub async fn get_router(
&mut self,
router_name: &str,
) -> Result<generated_types::Router, Error> {
let response = self
.inner
.get_router(GetRouterRequest {
router_name: router_name.to_string(),
})
.await?;
Ok(response.into_inner().router.unwrap_field("router")?)
}
/// List routers.
pub async fn list_routers(&mut self) -> Result<Vec<generated_types::Router>, Error> {
let response = self.inner.list_routers(ListRoutersRequest {}).await?;
Ok(response.into_inner().routers)
}
/// Update router
pub async fn update_router(&mut self, config: generated_types::Router) -> Result<(), Error> {
self.inner
.update_router(UpdateRouterRequest {
router: Some(config),
})
.await?;
Ok(())
}
/// Delete router
pub async fn delete_router(&mut self, router_name: &str) -> Result<(), Error> {
self.inner
.delete_router(DeleteRouterRequest {
router_name: router_name.to_string(),
})
.await?;
Ok(())
}
}

View File

@ -1,229 +0,0 @@
//! CLI to create databases.
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
missing_docs,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
use std::collections::HashMap;
use clap::{Arg, Command};
use influxdb_iox_client::{
management::generated_types::{
lifecycle_rules, partition_template, DatabaseRules, LifecycleRules, PartitionTemplate,
},
router::generated_types::{
write_sink, Matcher, MatcherToShard, Router, ShardConfig, WriteBufferConnection, WriteSink,
WriteSinkSet,
},
write::generated_types::{column, Column, DatabaseBatch, TableBatch, WriteRequest},
};
#[tokio::main]
async fn main() {
let help = r#"IOx database creator
Examples:
# Create a database named `foo_bar` with the IOx server listening at the default gRPC address:
create_database foo_bar
# Create a database named `myorg_mybucket` with the IOx server listening at
# 127.0.0.1:9000:
create_database --grpc-bind 127.0.0.1:9000 myorg_mybucket
"#;
let matches = Command::new(help)
.about("IOx Database creation script")
.arg(
Arg::new("DATABASE_NAME")
.help("Name of the database to create")
.takes_value(true)
.required(true),
)
.arg(
Arg::new("WRITER")
.long("writer")
.help("The gRPC host and port of the IOx server that should write to Kafka")
.takes_value(true)
.required(true),
)
.arg(
Arg::new("READER")
.long("reader")
.help("The gRPC host and port of the IOx server that should read from Kafka")
.takes_value(true)
.required(true),
)
.arg(
Arg::new("KAFKA")
.long("kafka")
.help("The connection address of the Kafka instance")
.takes_value(true)
.default_value("127.0.0.1:9093"),
)
.get_matches();
let db_name = matches
.value_of("DATABASE_NAME")
.expect("DATABASE_NAME is required")
.to_string();
let writer = matches.value_of("WRITER").expect("WRITER is required");
let reader = matches.value_of("READER").expect("READER is required");
let kafka = matches
.value_of("KAFKA")
.expect("KAFKA has a default value");
// Edit these to whatever DatabaseRules you want to use
let router_config = Router {
name: db_name.clone(),
write_sharder: Some(ShardConfig {
specific_targets: vec![MatcherToShard {
matcher: Some(Matcher {
table_name_regex: String::from(".*"),
}),
shard: 1,
}],
hash_ring: None,
}),
write_sinks: HashMap::from([(
1,
WriteSinkSet {
sinks: vec![WriteSink {
sink: Some(write_sink::Sink::WriteBuffer(WriteBufferConnection {
r#type: "kafka".to_string(),
connection: kafka.to_string(),
..Default::default()
})),
ignore_errors: false,
}],
},
)]),
query_sinks: None,
};
let database_rules = DatabaseRules {
name: db_name.clone(),
partition_template: Some(PartitionTemplate {
parts: vec![partition_template::Part {
part: Some(partition_template::part::Part::Time(
"%Y-%m-%d %H:00:00".into(),
)),
}],
}),
lifecycle_rules: Some(LifecycleRules {
buffer_size_soft: 1024 * 1024 * 1024,
buffer_size_hard: 1024 * 1024 * 1024 * 2,
worker_backoff_millis: 100,
max_active_compactions_cfg: Some(
lifecycle_rules::MaxActiveCompactionsCfg::MaxActiveCompactions(1),
),
persist: true,
persist_row_threshold: 10 * 1000 * 1000,
..Default::default()
}),
worker_cleanup_avg_sleep: None,
write_buffer_connection: Some(WriteBufferConnection {
r#type: "kafka".to_string(),
connection: kafka.to_string(),
..Default::default()
}),
};
// Create the writer db
let writer_grpc_bind_addr = format!("http://{}", writer);
let writer_grpc_channel = influxdb_iox_client::connection::Builder::default()
.build(writer_grpc_bind_addr)
.await
.unwrap();
let mut writer_router_client =
influxdb_iox_client::router::Client::new(writer_grpc_channel.clone());
writer_router_client
.update_router(router_config)
.await
.expect("create router failed");
// Write a few points
let mut write_client = influxdb_iox_client::write::Client::new(writer_grpc_channel);
write_client
.write_pb(test_write(&db_name))
.await
.expect("cannot write");
// Create the reader db
let reader_grpc_bind_addr = format!("http://{}", reader);
let reader_grpc_channel = influxdb_iox_client::connection::Builder::default()
.build(reader_grpc_bind_addr)
.await
.unwrap();
let mut reader_management_client =
influxdb_iox_client::management::Client::new(reader_grpc_channel.clone());
reader_management_client
.create_database(database_rules)
.await
.expect("create reader database failed");
println!("Created database {}", db_name);
}
/// 3 rows of test data
///
/// "write_test,region=west user=23.2 100"
// "write_test,region=west user=21.0 150"
// "write_test,region=east bytes=99i 200"
fn test_write(db_name: &str) -> WriteRequest {
WriteRequest {
database_batch: Some(DatabaseBatch {
database_name: db_name.to_string(),
table_batches: vec![TableBatch {
table_name: "write_test".to_string(),
columns: vec![
Column {
column_name: "time".to_string(),
semantic_type: column::SemanticType::Time as _,
values: Some(column::Values {
i64_values: vec![100, 150, 200],
..Default::default()
}),
null_mask: vec![],
},
Column {
column_name: "region".to_string(),
semantic_type: column::SemanticType::Tag as _,
values: Some(column::Values {
string_values: vec![
"west".to_string(),
"west".to_string(),
"east".to_string(),
],
..Default::default()
}),
null_mask: vec![],
},
Column {
column_name: "user".to_string(),
semantic_type: column::SemanticType::Field as _,
values: Some(column::Values {
f64_values: vec![23.2, 21.0],
..Default::default()
}),
null_mask: vec![0b00000100],
},
Column {
column_name: "bytes".to_string(),
semantic_type: column::SemanticType::Field as _,
values: Some(column::Values {
i64_values: vec![99],
..Default::default()
}),
null_mask: vec![0b00000011],
},
],
row_count: 3,
}],
}),
}
}

View File

@ -106,29 +106,26 @@ impl ServerType for DatabaseServerType {
#[cfg(test)]
mod tests {
use clap_blocks::run_config::RunConfig;
use influxdb_iox_client::flight::generated_types::ReadInfo;
use ioxd_common::{grpc_listener, http_listener, serve};
use crate::setup::{make_application, make_server};
use super::*;
use crate::setup::{make_application, make_server};
use ::http::{header::HeaderName, HeaderValue};
use clap::Parser;
use clap_blocks::run_config::RunConfig;
use data_types::{database_rules::DatabaseRules, DatabaseName};
use futures::pin_mut;
use influxdb_iox_client::{connection::Connection, flight::PerformQuery};
use influxdb_iox_client::{
connection::Connection,
flight::{generated_types::ReadInfo, PerformQuery},
};
use ioxd_common::{grpc_listener, http_listener, serve};
use server::rules::ProvidedDatabaseRules;
use std::{convert::TryInto, net::SocketAddr, num::NonZeroU64};
use std::{str::FromStr, time::Duration};
use std::{convert::TryInto, net::SocketAddr, num::NonZeroU64, str::FromStr, time::Duration};
use test_helpers::{assert_contains, assert_error};
use tokio::task::JoinHandle;
use trace::{
span::{Span, SpanStatus},
RingBufferTraceCollector,
};
use trace_exporters::export::{AsyncExporter, TestAsyncExporter};
/// Creates Application and Servers for this test
#[derive(Default)]
@ -431,80 +428,6 @@ mod tests {
(addr, server, join)
}
#[tokio::test]
async fn test_tracing() {
let trace_collector = Arc::new(RingBufferTraceCollector::new(20));
let (addr, server, join) = tracing_server(&trace_collector).await;
let client = influxdb_iox_client::connection::Builder::default()
.build(format!("http://{}", addr))
.await
.unwrap();
let mut client = influxdb_iox_client::management::Client::new(client);
client.list_database_names().await.unwrap();
assert_eq!(trace_collector.spans().len(), 0);
let b3_tracing_client = influxdb_iox_client::connection::Builder::default()
.header(
HeaderName::from_static("x-b3-sampled"),
HeaderValue::from_static("1"),
)
.header(
HeaderName::from_static("x-b3-traceid"),
HeaderValue::from_static("fea24902"),
)
.header(
HeaderName::from_static("x-b3-spanid"),
HeaderValue::from_static("ab3409"),
)
.build(format!("http://{}", addr))
.await
.unwrap();
let mut b3_tracing_client = influxdb_iox_client::management::Client::new(b3_tracing_client);
b3_tracing_client.list_database_names().await.unwrap();
b3_tracing_client.get_server_status().await.unwrap();
let conn = jaeger_client(addr, "34f9495:30e34:0:1").await;
influxdb_iox_client::management::Client::new(conn)
.list_database_names()
.await
.unwrap();
let spans = trace_collector.spans();
assert_eq!(spans.len(), 3);
assert_eq!(spans[0].name, "IOx");
assert_eq!(spans[0].ctx.parent_span_id.unwrap().0.get(), 0xab3409);
assert_eq!(spans[0].ctx.trace_id.0.get(), 0xfea24902);
assert!(spans[0].start.is_some());
assert!(spans[0].end.is_some());
assert_eq!(spans[0].status, SpanStatus::Ok);
assert_eq!(spans[1].name, "IOx");
assert_eq!(spans[1].ctx.parent_span_id.unwrap().0.get(), 0xab3409);
assert_eq!(spans[1].ctx.trace_id.0.get(), 0xfea24902);
assert!(spans[1].start.is_some());
assert!(spans[1].end.is_some());
assert_eq!(spans[2].name, "IOx");
assert_eq!(spans[2].ctx.parent_span_id.unwrap().0.get(), 0x30e34);
assert_eq!(spans[2].ctx.trace_id.0.get(), 0x34f9495);
assert!(spans[2].start.is_some());
assert!(spans[2].end.is_some());
assert_ne!(spans[0].ctx.span_id, spans[1].ctx.span_id);
// shutdown server early
server.shutdown();
let res = join.await.unwrap();
assert_error!(res, ioxd_common::Error::LostServer);
}
/// Ensure that query is fully executed.
async fn consume_query(mut query: PerformQuery) {
while query.next().await.unwrap().is_some() {}
@ -522,18 +445,6 @@ mod tests {
);
// Perform a number of different requests to generate traces
let mut management = influxdb_iox_client::management::Client::new(conn.clone());
management
.create_database(
influxdb_iox_client::management::generated_types::DatabaseRules {
name: db_info.db_name().to_string(),
..Default::default()
},
)
.await
.unwrap();
let mut write = influxdb_iox_client::write::Client::new(conn.clone());
write
.write_lp(db_info.db_name(), "cpu,tag0=foo val=1 100\n", 0)
@ -636,30 +547,6 @@ mod tests {
child(to_string_set, "run_logical_plans").unwrap();
}
#[tokio::test]
async fn test_async_exporter() {
let (sender, mut receiver) = tokio::sync::mpsc::channel(20);
let collector = Arc::new(AsyncExporter::new(TestAsyncExporter::new(sender)));
let (addr, server, join) = tracing_server(&collector).await;
let conn = jaeger_client(addr, "34f8495:30e34:0:1").await;
influxdb_iox_client::management::Client::new(conn)
.list_database_names()
.await
.unwrap();
collector.drain().await.unwrap();
// early shutdown
server.shutdown();
let res = join.await.unwrap();
assert_error!(res, ioxd_common::Error::LostServer);
let span = receiver.recv().await.unwrap();
assert_eq!(span.ctx.trace_id.get(), 0x34f8495);
assert_eq!(span.ctx.parent_span_id.unwrap().get(), 0x30e34);
}
fn make_rules(db_name: impl Into<String>) -> ProvidedDatabaseRules {
let db_name = DatabaseName::new(db_name.into()).unwrap();
ProvidedDatabaseRules::new_rules(DatabaseRules::new(db_name).into())