Merge branch 'main' into pd-routing-rules

pull/24376/head
kodiakhq[bot] 2021-03-12 20:02:53 +00:00 committed by GitHub
commit fcd4419702
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
51 changed files with 2510 additions and 651 deletions

3
Cargo.lock generated
View File

@ -1509,6 +1509,7 @@ dependencies = [
"panic_logging",
"parking_lot",
"predicates",
"prettytable-rs",
"prost",
"query",
"rand 0.7.3",
@ -3206,6 +3207,7 @@ dependencies = [
"flatbuffers 0.6.1",
"futures",
"generated_types",
"hashbrown",
"influxdb_line_protocol",
"mutable_buffer",
"object_store",
@ -3219,6 +3221,7 @@ dependencies = [
"snap",
"test_helpers",
"tokio",
"tokio-util",
"tracing",
"uuid",
]

View File

@ -70,6 +70,8 @@ http = "0.2.0"
hyper = "0.14"
opentelemetry = { version = "0.12", default-features = false, features = ["trace", "tokio-support"] }
opentelemetry-jaeger = { version = "0.11", features = ["tokio"] }
# used by arrow/datafusion anyway
prettytable-rs = "0.8"
prost = "0.7"
# Forked to upgrade hyper and tokio
routerify = { git = "https://github.com/influxdata/routerify", rev = "274e250" }

View File

@ -37,4 +37,4 @@ EXPOSE 8080 8082
ENTRYPOINT ["/usr/bin/influxdb_iox"]
CMD ["server", "run"]
CMD ["run"]

View File

@ -133,7 +133,7 @@ takes its configuration as environment variables.
You can see a list of the current configuration values by running `influxdb_iox
--help`, as well as the specific subcommand config options such as `influxdb_iox
server --help`.
run --help`.
Should you desire specifying config via a file, you can do so using a
`.env` formatted file in the working directory. You can use the
@ -175,7 +175,7 @@ cargo build --release
which will create the corresponding binary in `target/release`:
```shell
./target/release/influxdb_iox server
./target/release/influxdb_iox run
```
Similarly, you can do this in one step with:

176
data_types/src/chunk.rs Normal file
View File

@ -0,0 +1,176 @@
//! Module contains a representation of chunk metadata
use std::{convert::TryFrom, sync::Arc};
use crate::field_validation::FromField;
use generated_types::{google::FieldViolation, influxdata::iox::management::v1 as management};
use serde::{Deserialize, Serialize};
/// Which storage system is a chunk located in?
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
pub enum ChunkStorage {
/// The chunk is still open for new writes, in the Mutable Buffer
OpenMutableBuffer,
/// The chunk is no longer open for writes, in the Mutable Buffer
ClosedMutableBuffer,
/// The chunk is in the Read Buffer (where it can not be mutated)
ReadBuffer,
/// The chunk is stored in Object Storage (where it can not be mutated)
ObjectStore,
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
/// Represents metadata about a chunk in a database.
/// A chunk can contain one or more tables.
pub struct ChunkSummary {
/// The partitition key of this chunk
pub partition_key: Arc<String>,
/// The id of this chunk
pub id: u32,
/// How is this chunk stored?
pub storage: ChunkStorage,
/// The total estimated size of this chunk, in bytes
pub estimated_bytes: usize,
}
/// Conversion code to management API chunk structure
impl From<ChunkSummary> for management::Chunk {
fn from(summary: ChunkSummary) -> Self {
let ChunkSummary {
partition_key,
id,
storage,
estimated_bytes,
} = summary;
let storage: management::ChunkStorage = storage.into();
let storage = storage.into(); // convert to i32
let estimated_bytes = estimated_bytes as u64;
let partition_key = match Arc::try_unwrap(partition_key) {
// no one else has a reference so take the string
Ok(partition_key) => partition_key,
// some other refernece exists to this string, so clone it
Err(partition_key) => partition_key.as_ref().clone(),
};
Self {
partition_key,
id,
storage,
estimated_bytes,
}
}
}
impl From<ChunkStorage> for management::ChunkStorage {
fn from(storage: ChunkStorage) -> Self {
match storage {
ChunkStorage::OpenMutableBuffer => Self::OpenMutableBuffer,
ChunkStorage::ClosedMutableBuffer => Self::ClosedMutableBuffer,
ChunkStorage::ReadBuffer => Self::ReadBuffer,
ChunkStorage::ObjectStore => Self::ObjectStore,
}
}
}
/// Conversion code from management API chunk structure
impl TryFrom<management::Chunk> for ChunkSummary {
type Error = FieldViolation;
fn try_from(proto: management::Chunk) -> Result<Self, Self::Error> {
// Use prost enum conversion
let storage = proto.storage().scope("storage")?;
let management::Chunk {
partition_key,
id,
estimated_bytes,
..
} = proto;
let estimated_bytes = estimated_bytes as usize;
let partition_key = Arc::new(partition_key);
Ok(Self {
partition_key,
id,
storage,
estimated_bytes,
})
}
}
impl TryFrom<management::ChunkStorage> for ChunkStorage {
type Error = FieldViolation;
fn try_from(proto: management::ChunkStorage) -> Result<Self, Self::Error> {
match proto {
management::ChunkStorage::OpenMutableBuffer => Ok(Self::OpenMutableBuffer),
management::ChunkStorage::ClosedMutableBuffer => Ok(Self::ClosedMutableBuffer),
management::ChunkStorage::ReadBuffer => Ok(Self::ReadBuffer),
management::ChunkStorage::ObjectStore => Ok(Self::ObjectStore),
management::ChunkStorage::Unspecified => Err(FieldViolation::required("")),
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn valid_proto_to_summary() {
let proto = management::Chunk {
partition_key: "foo".to_string(),
id: 42,
estimated_bytes: 1234,
storage: management::ChunkStorage::ObjectStore.into(),
};
let summary = ChunkSummary::try_from(proto).expect("conversion successful");
let expected = ChunkSummary {
partition_key: Arc::new("foo".to_string()),
id: 42,
estimated_bytes: 1234,
storage: ChunkStorage::ObjectStore,
};
assert_eq!(
summary, expected,
"Actual:\n\n{:?}\n\nExpected:\n\n{:?}\n\n",
summary, expected
);
}
#[test]
fn valid_summary_to_proto() {
let summary = ChunkSummary {
partition_key: Arc::new("foo".to_string()),
id: 42,
estimated_bytes: 1234,
storage: ChunkStorage::ObjectStore,
};
let proto = management::Chunk::try_from(summary).expect("conversion successful");
let expected = management::Chunk {
partition_key: "foo".to_string(),
id: 42,
estimated_bytes: 1234,
storage: management::ChunkStorage::ObjectStore.into(),
};
assert_eq!(
proto, expected,
"Actual:\n\n{:?}\n\nExpected:\n\n{:?}\n\n",
proto, expected
);
}
}

24
data_types/src/job.rs Normal file
View File

@ -0,0 +1,24 @@
use generated_types::influxdata::iox::management::v1 as management;
/// Metadata associated with a set of background tasks
/// Used in combination with TrackerRegistry
#[derive(Debug, Clone)]
pub enum Job {
PersistSegment { writer_id: u32, segment_id: u64 },
Dummy { nanos: Vec<u64> },
}
impl From<Job> for management::operation_metadata::Job {
fn from(job: Job) -> Self {
match job {
Job::PersistSegment {
writer_id,
segment_id,
} => Self::PersistSegment(management::PersistSegment {
writer_id,
segment_id,
}),
Job::Dummy { nanos } => Self::Dummy(management::Dummy { nanos }),
}
}
}

View File

@ -10,6 +10,7 @@
clippy::clone_on_ref_ptr
)]
pub use database_name::*;
pub use schema::TIME_COLUMN_NAME;
/// The name of the column containing table names returned by a call to
@ -20,10 +21,12 @@ pub const TABLE_NAMES_COLUMN_NAME: &str = "table";
/// `column_names`.
pub const COLUMN_NAMES_COLUMN_NAME: &str = "column";
pub mod chunk;
pub mod data;
pub mod database_rules;
pub mod error;
pub mod http;
pub mod job;
pub mod names;
pub mod partition_metadata;
pub mod schema;
@ -32,6 +35,4 @@ pub mod timestamp;
pub mod wal;
mod database_name;
pub use database_name::*;
pub(crate) mod field_validation;

View File

@ -21,4 +21,4 @@ EXPOSE 8080 8082
ENTRYPOINT ["influxdb_iox"]
CMD ["server", "run"]
CMD ["run"]

View File

@ -4,6 +4,15 @@ This directory contains internal design documentation of potential
interest for those who wish to understand how the code works. It is
not intended to be general user facing documentation
## IOx Tech Talks
We hold monthly Tech Talks that explain the project's technical underpinnings. You can register for the [InfluxDB IOx Tech Talks here](https://www.influxdata.com/community-showcase/influxdb-tech-talks/), or you can find links to previous sessions below:
* December 2020: Rusty Introduction to Apache Arrow [recording](https://www.youtube.com/watch?v=dQFjKa9vKhM)
* Jan 2021: Data Lifecycle in InfluxDB IOx & How it Uses Object Storage for Persistence [recording](https://www.youtube.com/watch?v=KwdPifHC1Gc)
* February 2021: Intro to the InfluxDB IOx Read Buffer [recording](https://www.youtube.com/watch?v=KslD31VNqPU) [slides](https://www.slideshare.net/influxdata/influxdb-iox-tech-talks-intro-to-the-influxdb-iox-read-buffer-a-readoptimized-inmemory-query-execution-engine)
* March 2021: Query Engine Design and the Rust-Based DataFusion in Apache Arrow [recording](https://www.youtube.com/watch?v=K6eCAVEk4kU) [slides](https://www.slideshare.net/influxdata/influxdb-iox-tech-talks-query-engine-design-and-the-rustbased-datafusion-in-apache-arrow-244161934)
## Table of Contents:
* Rust style and Idiom guide: [style_guide.md](style_guide.md)

View File

@ -7,7 +7,7 @@
# The full list of available configuration values can be found by in
# the command line help (e.g. `env: INFLUXDB_IOX_DB_DIR=`):
#
# ./influxdb_iox server --help
# ./influxdb_iox run --help
#
#
# The identifier for the server. Used for writing to object storage and as

View File

@ -21,14 +21,14 @@ of the object stores, the relevant tests will run.
### Configuration differences when running the tests
When running `influxdb_iox server`, you can pick one object store to use. When running the tests,
When running `influxdb_iox run`, you can pick one object store to use. When running the tests,
you can run them against all the possible object stores. There's still only one
`INFLUXDB_IOX_BUCKET` variable, though, so that will set the bucket name for all configured object
stores. Use the same bucket name when setting up the different services.
Other than possibly configuring multiple object stores, configuring the tests to use the object
store services is the same as configuring the server to use an object store service. See the output
of `influxdb_iox server --help` for instructions.
of `influxdb_iox run --help` for instructions.
## InfluxDB IOx Client

View File

@ -39,7 +39,9 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
idpe_path.join("source.proto"),
management_path.join("base_types.proto"),
management_path.join("database_rules.proto"),
management_path.join("chunk.proto"),
management_path.join("service.proto"),
management_path.join("jobs.proto"),
write_path.join("service.proto"),
root.join("grpc/health/v1/service.proto"),
root.join("google/longrunning/operations.proto"),

View File

@ -0,0 +1,37 @@
syntax = "proto3";
package influxdata.iox.management.v1;
// Which storage system is a chunk located in?
enum ChunkStorage {
// Not currently returned
CHUNK_STORAGE_UNSPECIFIED = 0;
// The chunk is still open for new writes, in the Mutable Buffer
CHUNK_STORAGE_OPEN_MUTABLE_BUFFER = 1;
// The chunk is no longer open for writes, in the Mutable Buffer
CHUNK_STORAGE_CLOSED_MUTABLE_BUFFER = 2;
// The chunk is in the Read Buffer (where it can not be mutated)
CHUNK_STORAGE_READ_BUFFER = 3;
// The chunk is stored in Object Storage (where it can not be mutated)
CHUNK_STORAGE_OBJECT_STORE = 4;
}
// `Chunk` represents part of a partition of data in a database.
// A chunk can contain one or more tables.
message Chunk {
// The partitition key of this chunk
string partition_key = 1;
// The id of this chunk
uint32 id = 2;
// Which storage system the chunk is located in
ChunkStorage storage = 3;
// The total estimated size of this chunk, in bytes
uint64 estimated_bytes = 4;
}

View File

@ -0,0 +1,23 @@
syntax = "proto3";
package influxdata.iox.management.v1;
message OperationMetadata {
uint64 cpu_nanos = 1;
uint64 wall_nanos = 2;
uint64 task_count = 3;
uint64 pending_count = 4;
oneof job {
Dummy dummy = 5;
PersistSegment persist_segment = 6;
}
}
message PersistSegment {
uint32 writer_id = 1;
uint64 segment_id = 2;
}
message Dummy {
repeated uint64 nanos = 1;
}

View File

@ -1,8 +1,9 @@
syntax = "proto3";
package influxdata.iox.management.v1;
import "google/protobuf/empty.proto";
import "google/longrunning/operations.proto";
import "influxdata/iox/management/v1/database_rules.proto";
import "influxdata/iox/management/v1/chunk.proto";
service ManagementService {
rpc GetWriterId(GetWriterIdRequest) returns (GetWriterIdResponse);
@ -15,6 +16,9 @@ service ManagementService {
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse);
// List chunks available on this database
rpc ListChunks(ListChunksRequest) returns (ListChunksResponse);
// List remote IOx servers we know about.
rpc ListRemotes(ListRemotesRequest) returns (ListRemotesResponse);
@ -23,6 +27,15 @@ service ManagementService {
// Delete a reference to remote IOx server.
rpc DeleteRemote(DeleteRemoteRequest) returns (DeleteRemoteResponse);
// Creates a dummy job that for each value of the nanos field
// spawns a task that sleeps for that number of nanoseconds before returning
rpc CreateDummyJob(CreateDummyJobRequest) returns (CreateDummyJobResponse) {
option (google.longrunning.operation_info) = {
response_type: "google.protobuf.Empty"
metadata_type: "OperationMetadata"
};
}
}
message GetWriterIdRequest {}
@ -57,6 +70,23 @@ message CreateDatabaseRequest {
message CreateDatabaseResponse {}
message ListChunksRequest {
// the name of the database
string db_name = 1;
}
message ListChunksResponse {
repeated Chunk chunks = 1;
}
message CreateDummyJobRequest {
repeated uint64 nanos = 1;
}
message CreateDummyJobResponse {
google.longrunning.Operation operation = 1;
}
message ListRemotesRequest {}
message ListRemotesResponse {

View File

@ -9,7 +9,7 @@ service WriteService {
message WriteRequest {
// name of database into which to write
string name = 1;
string db_name = 1;
// data, in [LineProtocol] format
//

View File

@ -84,6 +84,18 @@ impl FieldViolation {
}
}
impl std::error::Error for FieldViolation {}
impl std::fmt::Display for FieldViolation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Violation for field \"{}\": {}",
self.field, self.description
)
}
}
fn encode_bad_request(violation: Vec<FieldViolation>) -> Result<Any, EncodeError> {
let mut buffer = BytesMut::new();
@ -106,7 +118,7 @@ fn encode_bad_request(violation: Vec<FieldViolation>) -> Result<Any, EncodeError
impl From<FieldViolation> for tonic::Status {
fn from(f: FieldViolation) -> Self {
let message = format!("Violation for field \"{}\": {}", f.field, f.description);
let message = f.to_string();
match encode_bad_request(vec![f]) {
Ok(details) => encode_status(tonic::Code::InvalidArgument, message, details),

View File

@ -80,6 +80,14 @@ pub enum GetDatabaseError {
ServerError(tonic::Status),
}
/// Errors returned by Client::list_chunks
#[derive(Debug, Error)]
pub enum ListChunksError {
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
}
/// Errors returned by Client::list_remotes
#[derive(Debug, Error)]
pub enum ListRemotesError {
@ -215,6 +223,21 @@ impl Client {
Ok(rules)
}
/// List chunks in a database.
pub async fn list_chunks(
&mut self,
db_name: impl Into<String>,
) -> Result<Vec<Chunk>, ListChunksError> {
let db_name = db_name.into();
let response = self
.inner
.list_chunks(ListChunksRequest { db_name })
.await
.map_err(ListChunksError::ServerError)?;
Ok(response.into_inner().chunks)
}
/// List remotes.
pub async fn list_remotes(&mut self) -> Result<Vec<generated_types::Remote>, ListRemotesError> {
let response = self

View File

@ -61,14 +61,14 @@ impl Client {
/// [LineProtocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format)
pub async fn write(
&mut self,
name: impl Into<String>,
db_name: impl Into<String>,
lp_data: impl Into<String>,
) -> Result<usize, WriteError> {
let name = name.into();
let db_name = db_name.into();
let lp_data = lp_data.into();
let response = self
.inner
.write(WriteRequest { name, lp_data })
.write(WriteRequest { db_name, lp_data })
.await
.map_err(WriteError::ServerError)?;

View File

@ -79,6 +79,14 @@ impl MutableBufferDb {
}
}
/// returns the id of the current open chunk in the specified partition
pub fn open_chunk_id(&self, partition_key: &str) -> u32 {
let partition = self.get_partition(partition_key);
let partition = partition.read().expect("mutex poisoned");
partition.open_chunk_id()
}
/// Directs the writes from batch into the appropriate partitions
fn write_entries_to_partitions(&self, batch: &wal::WriteBufferBatch<'_>) -> Result<()> {
if let Some(entries) = batch.entries() {

View File

@ -107,6 +107,11 @@ impl Partition {
}
}
/// returns the id of the current open chunk in this partition
pub(crate) fn open_chunk_id(&self) -> u32 {
self.open_chunk.id()
}
/// write data to the open chunk
pub fn write_entry(&mut self, entry: &wb::WriteBufferEntry<'_>) -> Result<()> {
assert_eq!(

View File

@ -9,7 +9,8 @@
use arrow_deps::datafusion::physical_plan::SendableRecordBatchStream;
use async_trait::async_trait;
use data_types::{
data::ReplicatedWrite, partition_metadata::TableSummary, schema::Schema, selection::Selection,
chunk::ChunkSummary, data::ReplicatedWrite, partition_metadata::TableSummary, schema::Schema,
selection::Selection,
};
use exec::{stringset::StringSet, Executor};
@ -49,6 +50,9 @@ pub trait Database: Debug + Send + Sync {
/// covering set means that together the chunks make up a single
/// complete copy of the data being queried.
fn chunks(&self, partition_key: &str) -> Vec<Arc<Self::Chunk>>;
/// Return a summary of all chunks in this database, in all partitions
fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>, Self::Error>;
}
/// Collection of data that shares the same partition key
@ -60,7 +64,7 @@ pub trait PartitionChunk: Debug + Send + Sync {
/// particular partition.
fn id(&self) -> u32;
/// returns the partition metadata stats for every table in the partition
/// returns the partition metadata stats for every table in the chunk
fn table_stats(&self) -> Result<Vec<TableSummary>, Self::Error>;
/// Returns true if this chunk *might* have data that passes the

View File

@ -154,6 +154,10 @@ impl Database for TestDatabase {
vec![]
}
}
fn chunk_summaries(&self) -> Result<Vec<data_types::chunk::ChunkSummary>, Self::Error> {
unimplemented!("summaries not implemented TestDatabase")
}
}
#[derive(Debug, Default)]

View File

@ -177,6 +177,25 @@ impl Database {
.unwrap_or_default()
}
/// Returns the total estimated size in bytes for the chunks in the
/// specified partition. Returns None if there is no such partition
pub fn chunks_size<'a>(
&self,
partition_key: &str,
chunk_ids: impl IntoIterator<Item = &'a u32>,
) -> Option<u64> {
let partition_data = self.data.read().unwrap();
let partition = partition_data.partitions.get(partition_key);
partition.map(|partition| {
chunk_ids
.into_iter()
.map(|chunk_id| partition.chunk_size(*chunk_id))
.sum::<u64>()
})
}
/// Returns the total estimated size in bytes of the database.
pub fn size(&self) -> u64 {
let base_size = std::mem::size_of::<Self>();
@ -662,6 +681,16 @@ impl Partition {
.map(|chunk| std::mem::size_of::<u32>() as u64 + chunk.size())
.sum::<u64>()
}
/// The total estimated size in bytes of the specified chunk id
pub fn chunk_size(&self, chunk_id: u32) -> u64 {
let chunk_data = self.data.read().unwrap();
chunk_data
.chunks
.get(&chunk_id)
.map(|chunk| chunk.size())
.unwrap_or(0) // treat unknown chunks as zero size
}
}
/// ReadFilterResults implements ...

View File

@ -14,6 +14,7 @@ data_types = { path = "../data_types" }
flatbuffers = "0.6"
futures = "0.3.7"
generated_types = { path = "../generated_types" }
hashbrown = "0.9.1"
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
mutable_buffer = { path = "../mutable_buffer" }
object_store = { path = "../object_store" }
@ -26,6 +27,7 @@ serde_json = "1.0"
snafu = "0.6"
snap = "1.0.0"
tokio = { version = "1.0", features = ["macros", "time"] }
tokio-util = { version = "0.6.3" }
tracing = "0.1"
uuid = { version = "0.8", features = ["serde", "v4"] }

View File

@ -15,7 +15,7 @@ use std::{
sync::Arc,
};
use crate::tracker::{TrackedFutureExt, TrackerRegistry};
use crate::tracker::{TrackedFutureExt, TrackerRegistration};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use crc32fast::Hasher;
@ -73,12 +73,6 @@ pub enum Error {
InvalidFlatbuffersSegment,
}
#[derive(Debug, Clone)]
pub struct SegmentPersistenceTask {
writer_id: u32,
location: object_store::path::Path,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// An in-memory buffer of a write ahead log. It is split up into segments,
@ -376,7 +370,7 @@ impl Segment {
/// the given object store location.
pub fn persist_bytes_in_background(
&self,
reg: &TrackerRegistry<SegmentPersistenceTask>,
tracker: TrackerRegistration,
writer_id: u32,
db_name: &DatabaseName<'_>,
store: Arc<ObjectStore>,
@ -385,11 +379,6 @@ impl Segment {
let location = database_object_store_path(writer_id, db_name, &store);
let location = object_store_path_for_segment(&location, self.id)?;
let task_meta = SegmentPersistenceTask {
writer_id,
location: location.clone(),
};
let len = data.len();
let mut stream_data = std::io::Result::Ok(data.clone());
@ -414,7 +403,7 @@ impl Segment {
// TODO: Mark segment as persisted
info!("persisted data to {}", location.display());
}
.track(reg, task_meta),
.track(tracker),
);
Ok(())

View File

@ -10,7 +10,9 @@ use std::{
};
use async_trait::async_trait;
use data_types::{data::ReplicatedWrite, database_rules::DatabaseRules, selection::Selection};
use data_types::{
chunk::ChunkSummary, data::ReplicatedWrite, database_rules::DatabaseRules, selection::Selection,
};
use mutable_buffer::MutableBufferDb;
use parking_lot::Mutex;
use query::{Database, PartitionChunk};
@ -121,7 +123,7 @@ impl Db {
local_store
.rollover_partition(partition_key)
.context(RollingPartition)
.map(DBChunk::new_mb)
.map(|c| DBChunk::new_mb(c, partition_key, false))
} else {
DatatbaseNotWriteable {}.fail()
}
@ -131,10 +133,15 @@ impl Db {
// potentially be migrated into the read buffer or object store)
pub fn mutable_buffer_chunks(&self, partition_key: &str) -> Vec<Arc<DBChunk>> {
let chunks = if let Some(mutable_buffer) = self.mutable_buffer.as_ref() {
let open_chunk_id = mutable_buffer.open_chunk_id(partition_key);
mutable_buffer
.chunks(partition_key)
.into_iter()
.map(DBChunk::new_mb)
.map(|c| {
let open = c.id() == open_chunk_id;
DBChunk::new_mb(c, partition_key, open)
})
.collect()
} else {
vec![]
@ -162,7 +169,7 @@ impl Db {
.as_ref()
.context(DatatbaseNotWriteable)?
.drop_chunk(partition_key, chunk_id)
.map(DBChunk::new_mb)
.map(|c| DBChunk::new_mb(c, partition_key, false))
.context(MutableBufferDrop)
}
@ -313,6 +320,21 @@ impl Database for Db {
.partition_keys()
.context(MutableBufferRead)
}
fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>> {
let summaries = self
.partition_keys()?
.into_iter()
.map(|partition_key| {
self.mutable_buffer_chunks(&partition_key)
.into_iter()
.chain(self.read_buffer_chunks(&partition_key).into_iter())
.map(|c| c.summary())
})
.flatten()
.collect();
Ok(summaries)
}
}
#[cfg(test)]
@ -324,8 +346,9 @@ mod tests {
use arrow_deps::{
arrow::record_batch::RecordBatch, assert_table_eq, datafusion::physical_plan::collect,
};
use data_types::database_rules::{
MutableBufferConfig, Order, PartitionSort, PartitionSortRules,
use data_types::{
chunk::ChunkStorage,
database_rules::{MutableBufferConfig, Order, PartitionSort, PartitionSortRules},
};
use query::{
exec::Executor, frontend::sql::SQLQueryPlanner, test::TestLPWriter, PartitionChunk,
@ -559,6 +582,75 @@ mod tests {
db.rules.mutable_buffer_config = Some(mbconf);
}
#[tokio::test]
async fn chunk_summaries() {
// Test that chunk id listing is hooked up
let db = make_db();
let mut writer = TestLPWriter::default();
// get three chunks: one open, one closed in mb and one close in rb
writer.write_lp_string(&db, "cpu bar=1 1").await.unwrap();
db.rollover_partition("1970-01-01T00").await.unwrap();
writer
.write_lp_string(&db, "cpu bar=1,baz=2 2")
.await
.unwrap();
// a fourth chunk in a different partition
writer
.write_lp_string(&db, "cpu bar=1,baz2,frob=3 400000000000000")
.await
.unwrap();
print!("Partitions: {:?}", db.partition_keys().unwrap());
db.load_chunk_to_read_buffer("1970-01-01T00", 0)
.await
.unwrap();
fn to_arc(s: &str) -> Arc<String> {
Arc::new(s.to_string())
}
let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return");
chunk_summaries.sort_unstable();
let expected = vec![
ChunkSummary {
partition_key: to_arc("1970-01-01T00"),
id: 0,
storage: ChunkStorage::ClosedMutableBuffer,
estimated_bytes: 70,
},
ChunkSummary {
partition_key: to_arc("1970-01-01T00"),
id: 0,
storage: ChunkStorage::ReadBuffer,
estimated_bytes: 1221,
},
ChunkSummary {
partition_key: to_arc("1970-01-01T00"),
id: 1,
storage: ChunkStorage::OpenMutableBuffer,
estimated_bytes: 101,
},
ChunkSummary {
partition_key: to_arc("1970-01-05T15"),
id: 0,
storage: ChunkStorage::OpenMutableBuffer,
estimated_bytes: 107,
},
];
assert_eq!(
expected, chunk_summaries,
"expected:\n{:#?}\n\nactual:{:#?}\n\n",
expected, chunk_summaries
);
}
// run a sql query against the database, returning the results as record batches
async fn run_query(db: &Db, query: &str) -> Vec<RecordBatch> {
let planner = SQLQueryPlanner::default();

View File

@ -1,5 +1,9 @@
use arrow_deps::datafusion::physical_plan::SendableRecordBatchStream;
use data_types::{schema::Schema, selection::Selection};
use data_types::{
chunk::{ChunkStorage, ChunkSummary},
schema::Schema,
selection::Selection,
};
use mutable_buffer::chunk::Chunk as MBChunk;
use query::{exec::stringset::StringSet, predicate::Predicate, PartitionChunk};
use read_buffer::Database as ReadBufferDb;
@ -58,10 +62,13 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
pub enum DBChunk {
MutableBuffer {
chunk: Arc<MBChunk>,
partition_key: Arc<String>,
/// is this chunk open for writing?
open: bool,
},
ReadBuffer {
db: Arc<ReadBufferDb>,
partition_key: String,
partition_key: Arc<String>,
chunk_id: u32,
},
ParquetFile, // TODO add appropriate type here
@ -69,8 +76,17 @@ pub enum DBChunk {
impl DBChunk {
/// Create a new mutable buffer chunk
pub fn new_mb(chunk: Arc<mutable_buffer::chunk::Chunk>) -> Arc<Self> {
Arc::new(Self::MutableBuffer { chunk })
pub fn new_mb(
chunk: Arc<mutable_buffer::chunk::Chunk>,
partition_key: impl Into<String>,
open: bool,
) -> Arc<Self> {
let partition_key = Arc::new(partition_key.into());
Arc::new(Self::MutableBuffer {
chunk,
partition_key,
open,
})
}
/// create a new read buffer chunk
@ -79,13 +95,54 @@ impl DBChunk {
partition_key: impl Into<String>,
chunk_id: u32,
) -> Arc<Self> {
let partition_key = partition_key.into();
let partition_key = Arc::new(partition_key.into());
Arc::new(Self::ReadBuffer {
db,
chunk_id,
partition_key,
})
}
pub fn summary(&self) -> ChunkSummary {
match self {
Self::MutableBuffer {
chunk,
partition_key,
open,
} => {
let storage = if *open {
ChunkStorage::OpenMutableBuffer
} else {
ChunkStorage::ClosedMutableBuffer
};
ChunkSummary {
partition_key: Arc::clone(partition_key),
id: chunk.id(),
storage,
estimated_bytes: chunk.size(),
}
}
Self::ReadBuffer {
db,
partition_key,
chunk_id,
} => {
let estimated_bytes = db
.chunks_size(partition_key.as_ref(), &[*chunk_id])
.unwrap_or(0) as usize;
ChunkSummary {
partition_key: Arc::clone(&partition_key),
id: *chunk_id,
storage: ChunkStorage::ReadBuffer,
estimated_bytes,
}
}
Self::ParquetFile => {
unimplemented!("parquet file summary not implemented")
}
}
}
}
#[async_trait]
@ -94,7 +151,7 @@ impl PartitionChunk for DBChunk {
fn id(&self) -> u32 {
match self {
Self::MutableBuffer { chunk } => chunk.id(),
Self::MutableBuffer { chunk, .. } => chunk.id(),
Self::ReadBuffer { chunk_id, .. } => *chunk_id,
Self::ParquetFile => unimplemented!("parquet file not implemented"),
}
@ -104,7 +161,7 @@ impl PartitionChunk for DBChunk {
&self,
) -> Result<Vec<data_types::partition_metadata::TableSummary>, Self::Error> {
match self {
Self::MutableBuffer { chunk } => chunk.table_stats().context(MutableBufferChunk),
Self::MutableBuffer { chunk, .. } => chunk.table_stats().context(MutableBufferChunk),
Self::ReadBuffer { .. } => unimplemented!("read buffer not implemented"),
Self::ParquetFile => unimplemented!("parquet file not implemented"),
}
@ -116,7 +173,7 @@ impl PartitionChunk for DBChunk {
_known_tables: &StringSet,
) -> Result<Option<StringSet>, Self::Error> {
let names = match self {
Self::MutableBuffer { chunk } => {
Self::MutableBuffer { chunk, .. } => {
if chunk.is_empty() {
Some(StringSet::new())
} else {
@ -192,7 +249,7 @@ impl PartitionChunk for DBChunk {
selection: Selection<'_>,
) -> Result<Schema, Self::Error> {
match self {
DBChunk::MutableBuffer { chunk } => chunk
DBChunk::MutableBuffer { chunk, .. } => chunk
.table_schema(table_name, selection)
.context(MutableBufferChunk),
DBChunk::ReadBuffer {
@ -235,7 +292,7 @@ impl PartitionChunk for DBChunk {
fn has_table(&self, table_name: &str) -> bool {
match self {
Self::MutableBuffer { chunk } => chunk.has_table(table_name),
Self::MutableBuffer { chunk, .. } => chunk.has_table(table_name),
Self::ReadBuffer {
db,
partition_key,
@ -257,7 +314,7 @@ impl PartitionChunk for DBChunk {
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, Self::Error> {
match self {
Self::MutableBuffer { chunk } => {
Self::MutableBuffer { chunk, .. } => {
// Note MutableBuffer doesn't support predicate
// pushdown (other than pruning out the entire chunk
// via `might_pass_predicate)
@ -341,7 +398,7 @@ impl PartitionChunk for DBChunk {
columns: Selection<'_>,
) -> Result<Option<StringSet>, Self::Error> {
match self {
Self::MutableBuffer { chunk } => {
Self::MutableBuffer { chunk, .. } => {
let chunk_predicate = match to_mutable_buffer_predicate(chunk, predicate) {
Ok(chunk_predicate) => chunk_predicate,
Err(e) => {
@ -389,7 +446,7 @@ impl PartitionChunk for DBChunk {
predicate: &Predicate,
) -> Result<Option<StringSet>, Self::Error> {
match self {
Self::MutableBuffer { chunk } => {
Self::MutableBuffer { chunk, .. } => {
use mutable_buffer::chunk::Error::UnsupportedColumnTypeForListingValues;
let chunk_predicate = match to_mutable_buffer_predicate(chunk, predicate) {

View File

@ -67,43 +67,44 @@
clippy::clone_on_ref_ptr
)]
pub mod buffer;
mod config;
pub mod db;
pub mod snapshot;
mod tracker;
#[cfg(test)]
mod query_tests;
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc,
};
use crate::{
buffer::SegmentPersistenceTask,
config::{
object_store_path_for_database_config, Config, GRPCConnectionString, DB_RULES_FILE_NAME,
},
db::Db,
tracker::TrackerRegistry,
};
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::TryStreamExt;
use snafu::{OptionExt, ResultExt, Snafu};
use tracing::{error, info};
use data_types::{
data::{lines_to_replicated_write, ReplicatedWrite},
database_rules::DatabaseRules,
database_rules::{DatabaseRules, WriterId},
job::Job,
{DatabaseName, DatabaseNameError},
};
use influxdb_line_protocol::ParsedLine;
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
use query::{exec::Executor, DatabaseStore};
use async_trait::async_trait;
use bytes::Bytes;
use data_types::database_rules::WriterId;
use futures::stream::TryStreamExt;
use snafu::{OptionExt, ResultExt, Snafu};
use tracing::error;
use crate::tracker::TrackedFutureExt;
use crate::{
config::{
object_store_path_for_database_config, Config, GRPCConnectionString, DB_RULES_FILE_NAME,
},
db::Db,
tracker::{Tracker, TrackerId, TrackerRegistry},
};
pub mod buffer;
mod config;
pub mod db;
pub mod snapshot;
pub mod tracker;
#[cfg(test)]
mod query_tests;
type DatabaseError = Box<dyn std::error::Error + Send + Sync + 'static>;
@ -157,7 +158,7 @@ pub struct Server<M: ConnectionManager> {
connection_manager: Arc<M>,
pub store: Arc<ObjectStore>,
executor: Arc<Executor>,
segment_persistence_registry: TrackerRegistry<SegmentPersistenceTask>,
jobs: TrackerRegistry<Job>,
}
impl<M: ConnectionManager> Server<M> {
@ -168,7 +169,7 @@ impl<M: ConnectionManager> Server<M> {
store,
connection_manager: Arc::new(connection_manager),
executor: Arc::new(Executor::new()),
segment_persistence_registry: TrackerRegistry::new(),
jobs: TrackerRegistry::new(),
}
}
@ -348,13 +349,14 @@ impl<M: ConnectionManager> Server<M> {
if persist {
let writer_id = self.require_id()?;
let store = Arc::clone(&self.store);
let (_, tracker) = self.jobs.register(Job::PersistSegment {
writer_id,
segment_id: segment.id,
});
segment
.persist_bytes_in_background(
&self.segment_persistence_registry,
writer_id,
db_name,
store,
)
.persist_bytes_in_background(tracker, writer_id, db_name, store)
.context(WalError)?;
}
}
@ -382,6 +384,50 @@ impl<M: ConnectionManager> Server<M> {
pub fn delete_remote(&self, id: WriterId) -> Option<GRPCConnectionString> {
self.config.delete_remote(id)
}
pub fn spawn_dummy_job(&self, nanos: Vec<u64>) -> Tracker<Job> {
let (tracker, registration) = self.jobs.register(Job::Dummy {
nanos: nanos.clone(),
});
for duration in nanos {
tokio::spawn(
tokio::time::sleep(tokio::time::Duration::from_nanos(duration))
.track(registration.clone()),
);
}
tracker
}
/// Returns a list of all jobs tracked by this server
pub fn tracked_jobs(&self) -> Vec<Tracker<Job>> {
self.jobs.tracked()
}
/// Returns a specific job tracked by this server
pub fn get_job(&self, id: TrackerId) -> Option<Tracker<Job>> {
self.jobs.get(id)
}
/// Background worker function
///
/// TOOD: Handle termination (#827)
pub async fn background_worker(&self) {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
loop {
// TODO: Retain limited history of past jobs, e.g. enqueue returned data into a
// Dequeue
let reclaimed = self.jobs.reclaim();
for job in reclaimed {
info!(?job, "job finished");
}
interval.tick().await;
}
}
}
#[async_trait]
@ -508,20 +554,24 @@ async fn get_store_bytes(
#[cfg(test)]
mod tests {
use super::*;
use crate::buffer::Segment;
use arrow_deps::{assert_table_eq, datafusion::physical_plan::collect};
use std::collections::BTreeMap;
use async_trait::async_trait;
use futures::TryStreamExt;
use parking_lot::Mutex;
use snafu::Snafu;
use arrow_deps::{assert_table_eq, datafusion::physical_plan::collect};
use data_types::database_rules::{
PartitionTemplate, TemplatePart, WalBufferConfig, WalBufferRollover,
};
use futures::TryStreamExt;
use influxdb_line_protocol::parse_lines;
use object_store::{memory::InMemory, path::ObjectStorePath};
use parking_lot::Mutex;
use query::frontend::sql::SQLQueryPlanner;
use snafu::Snafu;
use std::collections::BTreeMap;
use crate::buffer::Segment;
use super::*;
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T = (), E = TestError> = std::result::Result<T, E>;

View File

@ -442,7 +442,7 @@ mem,host=A,region=west used=45 1
];
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let chunk = DBChunk::new_mb(Arc::new(ChunkWB::new(11)));
let chunk = DBChunk::new_mb(Arc::new(ChunkWB::new(11)), "key", false);
let mut metadata_path = store.new_path();
metadata_path.push_dir("meta");

View File

@ -1,241 +1,592 @@
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
//! This module contains a future tracking system supporting fanout,
//! cancellation and asynchronous signalling of completion
//!
//! A Tracker is created by calling TrackerRegistry::register. TrackedFutures
//! can then be associated with this Tracker and monitored and/or cancelled.
//!
//! This is used within IOx to track futures spawned as multiple tokio tasks.
//!
//! For example, when migrating a chunk from the mutable buffer to the read
//! buffer:
//!
//! - There is a single over-arching Job being performed
//! - A single tracker is allocated from a TrackerRegistry in Server and
//! associated with the Job metadata
//! - This tracker is registered with every future that is spawned as a tokio
//! task
//!
//! This same system may in future form part of a query tracking story
//!
//! # Correctness
//!
//! The key correctness property of the Tracker system is Tracker::is_complete
//! only returns true when all futures associated with the tracker have
//! completed and no more can be spawned. Additionally at such a point
//! all metrics - cpu_nanos, wall_nanos, created_futures should be visible
//! to the calling thread
//!
//! Note: there is no guarantee that pending_registrations or pending_futures
//! ever reaches 0, a program could call mem::forget on a TrackerRegistration,
//! leak the TrackerRegistration, spawn a future that never finishes, etc...
//! Such a program would never consider the Tracker complete and therefore this
//! doesn't violate the correctness property
//!
//! ## Proof
//!
//! 1. pending_registrations starts at 1, and is incremented on
//! TrackerRegistration::clone. A TrackerRegistration cannot be created from an
//! existing TrackerState, only another TrackerRegistration
//!
//! 2. pending_registrations is decremented with release semantics on
//! TrackerRegistration::drop
//!
//! 3. pending_futures is only incremented with a TrackerRegistration in scope
//!
//! 4. 2. + 3. -> A thread that increments pending_futures, decrements
//! pending_registrations with release semantics afterwards. By definition of
//! release semantics these writes to pending_futures cannot be reordered to
//! come after the atomic decrement of pending_registrations
//!
//! 5. 1. + 2. + drop cannot be called multiple times on the same object -> once
//! pending_registrations is decremented to 0 it can never be incremented again
//!
//! 6. 4. + 5. -> the decrement to 0 of pending_registrations must commit after
//! the last increment of pending_futures
//!
//! 7. pending_registrations is loaded with acquire semantics
//!
//! 8. By definition of acquire semantics, any thread that reads
//! pending_registrations is guaranteed to see any increments to pending_futures
//! performed before the most recent decrement of pending_registrations
//!
//! 9. 6. + 8. -> A thread that observes a pending_registrations of 0 cannot
//! subsequently observe pending_futures to increase
//!
//! 10. Tracker::is_complete returns if it observes pending_registrations to be
//! 0 and then pending_futures to be 0
//!
//! 11. 9 + 10 -> A thread can only observe Tracker::is_complete() == true
//! after all futures have been dropped and no more can be created
//!
//! 12. pending_futures is decremented with Release semantics on
//! TrackedFuture::drop after any associated metrics have been incremented
//!
//! 13. pending_futures is loaded with acquire semantics
//!
//! 14. 12. + 13. -> A thread that observes a pending_futures of 0 is guaranteed
//! to see any metrics from any dropped TrackedFuture
//!
//! Note: this proof ignores the complexity of moving Trackers, TrackedFutures,
//! etc... between threads as any such functionality must perform the necessary
//! synchronisation to be well-formed.
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
use futures::prelude::*;
use parking_lot::Mutex;
use pin_project::{pin_project, pinned_drop};
use tokio_util::sync::CancellationToken;
use tracing::warn;
/// Every future registered with a `TrackerRegistry` is assigned a unique
/// `TrackerId`
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct TrackerId(usize);
pub use future::{TrackedFuture, TrackedFutureExt};
pub use registry::{TrackerId, TrackerRegistry};
mod future;
mod registry;
/// The state shared between all sibling tasks
#[derive(Debug)]
struct Tracker<T> {
data: T,
abort: future::AbortHandle,
struct TrackerState {
start_instant: Instant,
cancel_token: CancellationToken,
cpu_nanos: AtomicUsize,
wall_nanos: AtomicUsize,
created_futures: AtomicUsize,
pending_futures: AtomicUsize,
pending_registrations: AtomicUsize,
watch: tokio::sync::watch::Receiver<bool>,
}
/// A Tracker can be used to monitor/cancel/wait for a set of associated futures
#[derive(Debug)]
struct TrackerContextInner<T> {
id: AtomicUsize,
trackers: Mutex<HashMap<TrackerId, Tracker<T>>>,
pub struct Tracker<T> {
id: TrackerId,
state: Arc<TrackerState>,
metadata: Arc<T>,
}
/// Allows tracking the lifecycle of futures registered by
/// `TrackedFutureExt::track` with an accompanying metadata payload of type T
///
/// Additionally can trigger graceful termination of registered futures
#[derive(Debug)]
pub struct TrackerRegistry<T> {
inner: Arc<TrackerContextInner<T>>,
}
// Manual Clone to workaround https://github.com/rust-lang/rust/issues/26925
impl<T> Clone for TrackerRegistry<T> {
impl<T> Clone for Tracker<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
id: self.id,
state: Arc::clone(&self.state),
metadata: Arc::clone(&self.metadata),
}
}
}
impl<T> Default for TrackerRegistry<T> {
fn default() -> Self {
Self {
inner: Arc::new(TrackerContextInner {
id: AtomicUsize::new(0),
trackers: Mutex::new(Default::default()),
}),
}
}
}
impl<T> TrackerRegistry<T> {
pub fn new() -> Self {
Default::default()
impl<T> Tracker<T> {
/// Returns the ID of the Tracker - these are unique per TrackerRegistry
pub fn id(&self) -> TrackerId {
self.id
}
/// Trigger graceful termination of a registered future
///
/// Returns false if no future found with the provided ID
/// Returns a reference to the metadata stored within this Tracker
pub fn metadata(&self) -> &T {
&self.metadata
}
/// Trigger graceful termination of any futures tracked by
/// this tracker
///
/// Note: If the future is currently executing, termination
/// will only occur when the future yields (returns from poll)
#[allow(dead_code)]
pub fn terminate(&self, id: TrackerId) -> bool {
if let Some(meta) = self.inner.trackers.lock().get_mut(&id) {
meta.abort.abort();
true
} else {
false
/// and is then scheduled to run again
pub fn cancel(&self) {
self.state.cancel_token.cancel();
}
/// Returns the number of outstanding futures
pub fn pending_futures(&self) -> usize {
self.state.pending_futures.load(Ordering::Relaxed)
}
/// Returns the number of TrackedFutures created with this Tracker
pub fn created_futures(&self) -> usize {
self.state.created_futures.load(Ordering::Relaxed)
}
/// Returns the number of nanoseconds futures tracked by this
/// tracker have spent executing
pub fn cpu_nanos(&self) -> usize {
self.state.cpu_nanos.load(Ordering::Relaxed)
}
/// Returns the number of nanoseconds since the Tracker was registered
/// to the time the last TrackedFuture was dropped
///
/// Returns 0 if there are still pending tasks
pub fn wall_nanos(&self) -> usize {
if !self.is_complete() {
return 0;
}
self.state.wall_nanos.load(Ordering::Relaxed)
}
fn untrack(&self, id: &TrackerId) {
self.inner.trackers.lock().remove(id);
/// Returns true if all futures associated with this tracker have
/// been dropped and no more can be created
pub fn is_complete(&self) -> bool {
// The atomic decrement in TrackerRegistration::drop has release semantics
// acquire here ensures that if a thread observes the tracker to have
// no pending_registrations it cannot subsequently observe pending_futures
// to increase. If it could, observing pending_futures==0 would be insufficient
// to conclude there are no outstanding futures
let pending_registrations = self.state.pending_registrations.load(Ordering::Acquire);
// The atomic decrement in TrackedFuture::drop has release semantics
// acquire therefore ensures that if a thread observes the completion of
// a TrackedFuture, it is guaranteed to see its updates (e.g. wall_nanos)
let pending_futures = self.state.pending_futures.load(Ordering::Acquire);
pending_registrations == 0 && pending_futures == 0
}
fn track(&self, metadata: T) -> (TrackerId, future::AbortRegistration) {
let id = TrackerId(self.inner.id.fetch_add(1, Ordering::Relaxed));
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
self.inner.trackers.lock().insert(
id,
Tracker {
abort: abort_handle,
data: metadata,
},
);
(id, abort_registration)
/// Returns if this tracker has been cancelled
pub fn is_cancelled(&self) -> bool {
self.state.cancel_token.is_cancelled()
}
}
impl<T: Clone> TrackerRegistry<T> {
/// Returns a list of tracked futures, with their accompanying IDs and
/// metadata
#[allow(dead_code)]
pub fn tracked(&self) -> Vec<(TrackerId, T)> {
// TODO: Improve this - (#711)
self.inner
.trackers
.lock()
.iter()
.map(|(id, value)| (*id, value.data.clone()))
.collect()
}
}
/// Blocks until all futures associated with the tracker have been
/// dropped and no more can be created
pub async fn join(&self) {
let mut watch = self.state.watch.clone();
/// An extension trait that provides `self.track(reg, {})` allowing
/// registering this future with a `TrackerRegistry`
pub trait TrackedFutureExt: Future {
fn track<T>(self, reg: &TrackerRegistry<T>, metadata: T) -> TrackedFuture<Self, T>
where
Self: Sized,
{
let (id, registration) = reg.track(metadata);
TrackedFuture {
inner: future::Abortable::new(self, registration),
reg: reg.clone(),
id,
// Wait until watch is set to true or the tx side is dropped
while !*watch.borrow() {
if watch.changed().await.is_err() {
// tx side has been dropped
warn!("tracker watch dropped without signalling");
break;
}
}
}
}
impl<T: ?Sized> TrackedFutureExt for T where T: Future {}
/// The `Future` returned by `TrackedFutureExt::track()`
/// Unregisters the future from the registered `TrackerRegistry` on drop
/// and provides the early termination functionality used by
/// `TrackerRegistry::terminate`
#[pin_project(PinnedDrop)]
pub struct TrackedFuture<F: Future, T> {
#[pin]
inner: future::Abortable<F>,
reg: TrackerRegistry<T>,
id: TrackerId,
/// A TrackerRegistration is returned by TrackerRegistry::register and can be
/// used to register new TrackedFutures
///
/// A tracker will not be considered completed until all TrackerRegistrations
/// referencing it have been dropped. This is to prevent a race where further
/// TrackedFutures are registered with a Tracker that has already signalled
/// completion
#[derive(Debug)]
pub struct TrackerRegistration {
state: Arc<TrackerState>,
}
impl<F: Future, T> Future for TrackedFuture<F, T> {
type Output = Result<F::Output, future::Aborted>;
impl Clone for TrackerRegistration {
fn clone(&self) -> Self {
self.state
.pending_registrations
.fetch_add(1, Ordering::Relaxed);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx)
Self {
state: Arc::clone(&self.state),
}
}
}
#[pinned_drop]
impl<F: Future, T> PinnedDrop for TrackedFuture<F, T> {
fn drop(self: Pin<&mut Self>) {
// Note: This could cause a double-panic in an extreme situation where
// the internal `TrackerRegistry` lock is poisoned and drop was
// called as part of unwinding the stack to handle another panic
let this = self.project();
this.reg.untrack(this.id)
impl TrackerRegistration {
fn new(watch: tokio::sync::watch::Receiver<bool>) -> Self {
let state = Arc::new(TrackerState {
start_instant: Instant::now(),
cpu_nanos: AtomicUsize::new(0),
wall_nanos: AtomicUsize::new(0),
cancel_token: CancellationToken::new(),
created_futures: AtomicUsize::new(0),
pending_futures: AtomicUsize::new(0),
pending_registrations: AtomicUsize::new(1),
watch,
});
Self { state }
}
}
impl Drop for TrackerRegistration {
fn drop(&mut self) {
let previous = self
.state
.pending_registrations
.fetch_sub(1, Ordering::Release);
assert_ne!(previous, 0);
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use tokio::sync::oneshot;
#[tokio::test]
async fn test_lifecycle() {
let (sender, receive) = oneshot::channel();
let reg = TrackerRegistry::new();
let registry = TrackerRegistry::new();
let (_, registration) = registry.register(());
let task = tokio::spawn(receive.track(&reg, ()));
let task = tokio::spawn(receive.track(registration));
assert_eq!(reg.tracked().len(), 1);
assert_eq!(registry.running().len(), 1);
sender.send(()).unwrap();
task.await.unwrap().unwrap().unwrap();
assert_eq!(reg.tracked().len(), 0);
assert_eq!(registry.running().len(), 0);
}
#[tokio::test]
async fn test_interleaved() {
let (sender1, receive1) = oneshot::channel();
let (sender2, receive2) = oneshot::channel();
let reg = TrackerRegistry::new();
let registry = TrackerRegistry::new();
let (_, registration1) = registry.register(1);
let (_, registration2) = registry.register(2);
let task1 = tokio::spawn(receive1.track(&reg, 1));
let task2 = tokio::spawn(receive2.track(&reg, 2));
let task1 = tokio::spawn(receive1.track(registration1));
let task2 = tokio::spawn(receive2.track(registration2));
let mut tracked: Vec<_> = reg.tracked().iter().map(|x| x.1).collect();
tracked.sort_unstable();
assert_eq!(tracked, vec![1, 2]);
let tracked = sorted(registry.running());
assert_eq!(get_metadata(&tracked), vec![1, 2]);
sender2.send(()).unwrap();
task2.await.unwrap().unwrap().unwrap();
let tracked: Vec<_> = reg.tracked().iter().map(|x| x.1).collect();
assert_eq!(tracked, vec![1]);
let tracked: Vec<_> = sorted(registry.running());
assert_eq!(get_metadata(&tracked), vec![1]);
sender1.send(42).unwrap();
let ret = task1.await.unwrap().unwrap().unwrap();
assert_eq!(ret, 42);
assert_eq!(reg.tracked().len(), 0);
assert_eq!(registry.running().len(), 0);
}
#[tokio::test]
async fn test_drop() {
let reg = TrackerRegistry::new();
let registry = TrackerRegistry::new();
let (_, registration) = registry.register(());
{
let f = futures::future::pending::<()>().track(&reg, ());
let f = futures::future::pending::<()>().track(registration);
assert_eq!(reg.tracked().len(), 1);
assert_eq!(registry.running().len(), 1);
std::mem::drop(f);
}
assert_eq!(reg.tracked().len(), 0);
assert_eq!(registry.running().len(), 0);
}
#[tokio::test]
async fn test_drop_multiple() {
let registry = TrackerRegistry::new();
let (_, registration) = registry.register(());
{
let f = futures::future::pending::<()>().track(registration.clone());
{
let f = futures::future::pending::<()>().track(registration);
assert_eq!(registry.running().len(), 1);
std::mem::drop(f);
}
assert_eq!(registry.running().len(), 1);
std::mem::drop(f);
}
assert_eq!(registry.running().len(), 0);
}
#[tokio::test]
async fn test_terminate() {
let reg = TrackerRegistry::new();
let registry = TrackerRegistry::new();
let (_, registration) = registry.register(());
let task = tokio::spawn(futures::future::pending::<()>().track(&reg, ()));
let task = tokio::spawn(futures::future::pending::<()>().track(registration));
let tracked = reg.tracked();
let tracked = registry.running();
assert_eq!(tracked.len(), 1);
reg.terminate(tracked[0].0);
tracked[0].cancel();
let result = task.await.unwrap();
assert!(result.is_err());
assert_eq!(reg.tracked().len(), 0);
assert_eq!(registry.running().len(), 0);
}
#[tokio::test]
async fn test_terminate_early() {
let registry = TrackerRegistry::new();
let (tracker, registration) = registry.register(());
tracker.cancel();
let task1 = tokio::spawn(futures::future::pending::<()>().track(registration));
let result1 = task1.await.unwrap();
assert!(result1.is_err());
assert_eq!(registry.running().len(), 0);
}
#[tokio::test]
async fn test_terminate_multiple() {
let registry = TrackerRegistry::new();
let (_, registration) = registry.register(());
let task1 = tokio::spawn(futures::future::pending::<()>().track(registration.clone()));
let task2 = tokio::spawn(futures::future::pending::<()>().track(registration));
let tracked = registry.running();
assert_eq!(tracked.len(), 1);
tracked[0].cancel();
let result1 = task1.await.unwrap();
let result2 = task2.await.unwrap();
assert!(result1.is_err());
assert!(result2.is_err());
assert_eq!(registry.running().len(), 0);
}
#[tokio::test]
async fn test_reclaim() {
let registry = TrackerRegistry::new();
let (_, registration1) = registry.register(1);
let (_, registration2) = registry.register(2);
let (_, registration3) = registry.register(3);
let task1 = tokio::spawn(futures::future::pending::<()>().track(registration1.clone()));
let task2 = tokio::spawn(futures::future::pending::<()>().track(registration1));
let task3 = tokio::spawn(futures::future::ready(()).track(registration2.clone()));
let task4 = tokio::spawn(futures::future::pending::<()>().track(registration2));
let task5 = tokio::spawn(futures::future::pending::<()>().track(registration3));
let running = sorted(registry.running());
let tracked = sorted(registry.tracked());
assert_eq!(running.len(), 3);
assert_eq!(get_metadata(&running), vec![1, 2, 3]);
assert_eq!(tracked.len(), 3);
assert_eq!(get_metadata(&tracked), vec![1, 2, 3]);
// Trigger termination of task1 and task2
running[0].cancel();
let result1 = task1.await.unwrap();
let result2 = task2.await.unwrap();
assert!(result1.is_err());
assert!(result2.is_err());
let running = sorted(registry.running());
let tracked = sorted(registry.tracked());
assert_eq!(running.len(), 2);
assert_eq!(get_metadata(&running), vec![2, 3]);
assert_eq!(tracked.len(), 3);
assert_eq!(get_metadata(&tracked), vec![1, 2, 3]);
// Expect reclaim to find now finished registration1
let reclaimed = sorted(registry.reclaim());
assert_eq!(reclaimed.len(), 1);
assert_eq!(get_metadata(&reclaimed), vec![1]);
// Now expect tracked to match running
let running = sorted(registry.running());
let tracked = sorted(registry.tracked());
assert_eq!(running.len(), 2);
assert_eq!(get_metadata(&running), vec![2, 3]);
assert_eq!(tracked.len(), 2);
assert_eq!(get_metadata(&tracked), vec![2, 3]);
// Wait for task3 to finish
let result3 = task3.await.unwrap();
assert!(result3.is_ok());
assert_eq!(tracked[0].pending_futures(), 1);
assert_eq!(tracked[0].created_futures(), 2);
assert!(!tracked[0].is_complete());
// Trigger termination of task5
running[1].cancel();
let result5 = task5.await.unwrap();
assert!(result5.is_err());
let running = sorted(registry.running());
let tracked = sorted(registry.tracked());
assert_eq!(running.len(), 1);
assert_eq!(get_metadata(&running), vec![2]);
assert_eq!(tracked.len(), 2);
assert_eq!(get_metadata(&tracked), vec![2, 3]);
// Trigger termination of task4
running[0].cancel();
let result4 = task4.await.unwrap();
assert!(result4.is_err());
assert_eq!(running[0].pending_futures(), 0);
assert_eq!(running[0].created_futures(), 2);
assert!(running[0].is_complete());
let reclaimed = sorted(registry.reclaim());
assert_eq!(reclaimed.len(), 2);
assert_eq!(get_metadata(&reclaimed), vec![2, 3]);
assert_eq!(registry.tracked().len(), 0);
}
// Use n+1 threads where n is the number of "blocking" tasks
// to prevent stalling the tokio executor
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_timing() {
let registry = TrackerRegistry::new();
let (tracker1, registration1) = registry.register(1);
let (tracker2, registration2) = registry.register(2);
let (tracker3, registration3) = registry.register(3);
let task1 =
tokio::spawn(tokio::time::sleep(Duration::from_millis(100)).track(registration1));
let task2 = tokio::spawn(
async move { std::thread::sleep(Duration::from_millis(100)) }.track(registration2),
);
let task3 = tokio::spawn(
async move { std::thread::sleep(Duration::from_millis(100)) }
.track(registration3.clone()),
);
let task4 = tokio::spawn(
async move { std::thread::sleep(Duration::from_millis(100)) }.track(registration3),
);
task1.await.unwrap().unwrap();
task2.await.unwrap().unwrap();
task3.await.unwrap().unwrap();
task4.await.unwrap().unwrap();
assert_eq!(tracker1.pending_futures(), 0);
assert_eq!(tracker2.pending_futures(), 0);
assert_eq!(tracker3.pending_futures(), 0);
assert!(tracker1.is_complete());
assert!(tracker2.is_complete());
assert!(tracker3.is_complete());
assert_eq!(tracker2.created_futures(), 1);
assert_eq!(tracker2.created_futures(), 1);
assert_eq!(tracker3.created_futures(), 2);
let assert_fuzzy = |actual: usize, expected: std::time::Duration| {
// Number of milliseconds of toleration
let epsilon = Duration::from_millis(10).as_nanos() as usize;
let expected = expected.as_nanos() as usize;
assert!(
actual > expected.saturating_sub(epsilon),
"Expected {} got {}",
expected,
actual
);
assert!(
actual < expected.saturating_add(epsilon),
"Expected {} got {}",
expected,
actual
);
};
assert_fuzzy(tracker1.cpu_nanos(), Duration::from_millis(0));
assert_fuzzy(tracker1.wall_nanos(), Duration::from_millis(100));
assert_fuzzy(tracker2.cpu_nanos(), Duration::from_millis(100));
assert_fuzzy(tracker2.wall_nanos(), Duration::from_millis(100));
assert_fuzzy(tracker3.cpu_nanos(), Duration::from_millis(200));
assert_fuzzy(tracker3.wall_nanos(), Duration::from_millis(100));
}
#[tokio::test]
async fn test_register_race() {
let registry = TrackerRegistry::new();
let (_, registration) = registry.register(());
let task1 = tokio::spawn(futures::future::ready(()).track(registration.clone()));
task1.await.unwrap().unwrap();
// Should only consider tasks complete once cannot register more Futures
let reclaimed = registry.reclaim();
assert_eq!(reclaimed.len(), 0);
let task2 = tokio::spawn(futures::future::ready(()).track(registration));
task2.await.unwrap().unwrap();
let reclaimed = registry.reclaim();
assert_eq!(reclaimed.len(), 1);
}
fn sorted(mut input: Vec<Tracker<i32>>) -> Vec<Tracker<i32>> {
input.sort_unstable_by_key(|x| *x.metadata());
input
}
fn get_metadata(input: &[Tracker<i32>]) -> Vec<i32> {
let mut ret: Vec<_> = input.iter().map(|x| *x.metadata()).collect();
ret.sort_unstable();
ret
}
}

View File

@ -0,0 +1,87 @@
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::task::{Context, Poll};
use std::time::Instant;
use futures::{future::BoxFuture, prelude::*};
use pin_project::{pin_project, pinned_drop};
use super::{TrackerRegistration, TrackerState};
use std::sync::Arc;
/// An extension trait that provides `self.track(registration)` allowing
/// associating this future with a `TrackerRegistration`
pub trait TrackedFutureExt: Future {
fn track(self, registration: TrackerRegistration) -> TrackedFuture<Self>
where
Self: Sized,
{
let tracker = Arc::clone(&registration.state);
let token = tracker.cancel_token.clone();
tracker.created_futures.fetch_add(1, Ordering::Relaxed);
tracker.pending_futures.fetch_add(1, Ordering::Relaxed);
// This must occur after the increment of pending_futures
std::mem::drop(registration);
// The future returned by CancellationToken::cancelled borrows the token
// In order to ensure we get a future with a static lifetime
// we box them up together and let async work its magic
let abort = Box::pin(async move { token.cancelled().await });
TrackedFuture {
inner: self,
abort,
tracker,
}
}
}
impl<T: ?Sized> TrackedFutureExt for T where T: Future {}
/// The `Future` returned by `TrackedFutureExt::track()`
/// Unregisters the future from the registered `TrackerRegistry` on drop
/// and provides the early termination functionality used by
/// `TrackerRegistry::terminate`
#[pin_project(PinnedDrop)]
#[allow(missing_debug_implementations)]
pub struct TrackedFuture<F: Future> {
#[pin]
inner: F,
#[pin]
abort: BoxFuture<'static, ()>,
tracker: Arc<TrackerState>,
}
impl<F: Future> Future for TrackedFuture<F> {
type Output = Result<F::Output, future::Aborted>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.as_mut().project().abort.poll(cx).is_ready() {
return Poll::Ready(Err(future::Aborted {}));
}
let start = Instant::now();
let poll = self.as_mut().project().inner.poll(cx);
let delta = start.elapsed().as_nanos() as usize;
self.tracker.cpu_nanos.fetch_add(delta, Ordering::Relaxed);
poll.map(Ok)
}
}
#[pinned_drop]
impl<F: Future> PinnedDrop for TrackedFuture<F> {
fn drop(self: Pin<&mut Self>) {
let state = &self.project().tracker;
let wall_nanos = state.start_instant.elapsed().as_nanos() as usize;
state.wall_nanos.fetch_max(wall_nanos, Ordering::Relaxed);
let previous = state.pending_futures.fetch_sub(1, Ordering::Release);
assert_ne!(previous, 0);
}
}

View File

@ -0,0 +1,126 @@
use super::{Tracker, TrackerRegistration};
use hashbrown::HashMap;
use parking_lot::Mutex;
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tracing::debug;
/// Every future registered with a `TrackerRegistry` is assigned a unique
/// `TrackerId`
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct TrackerId(usize);
impl FromStr for TrackerId {
type Err = std::num::ParseIntError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(FromStr::from_str(s)?))
}
}
impl ToString for TrackerId {
fn to_string(&self) -> String {
self.0.to_string()
}
}
/// Internal data stored by TrackerRegistry
#[derive(Debug)]
struct TrackerSlot<T> {
tracker: Tracker<T>,
watch: tokio::sync::watch::Sender<bool>,
}
/// Allows tracking the lifecycle of futures registered by
/// `TrackedFutureExt::track` with an accompanying metadata payload of type T
///
/// Additionally can trigger graceful cancellation of registered futures
#[derive(Debug)]
pub struct TrackerRegistry<T> {
next_id: AtomicUsize,
trackers: Mutex<HashMap<TrackerId, TrackerSlot<T>>>,
}
impl<T> Default for TrackerRegistry<T> {
fn default() -> Self {
Self {
next_id: AtomicUsize::new(0),
trackers: Default::default(),
}
}
}
impl<T> TrackerRegistry<T> {
pub fn new() -> Self {
Default::default()
}
/// Register a new tracker in the registry
pub fn register(&self, metadata: T) -> (Tracker<T>, TrackerRegistration) {
let id = TrackerId(self.next_id.fetch_add(1, Ordering::Relaxed));
let (sender, receiver) = tokio::sync::watch::channel(false);
let registration = TrackerRegistration::new(receiver);
let tracker = Tracker {
id,
metadata: Arc::new(metadata),
state: Arc::clone(&registration.state),
};
self.trackers.lock().insert(
id,
TrackerSlot {
tracker: tracker.clone(),
watch: sender,
},
);
(tracker, registration)
}
/// Removes completed tasks from the registry and returns a list of those
/// removed
pub fn reclaim(&self) -> Vec<Tracker<T>> {
self.trackers
.lock()
.drain_filter(|_, v| v.tracker.is_complete())
.map(|(_, v)| {
if let Err(error) = v.watch.send(true) {
// As we hold a reference to the Tracker here, this should be impossible
debug!(?error, "failed to publish tracker completion")
}
v.tracker
})
.collect()
}
}
impl<T: Clone> TrackerRegistry<T> {
pub fn get(&self, id: TrackerId) -> Option<Tracker<T>> {
self.trackers.lock().get(&id).map(|x| x.tracker.clone())
}
/// Returns a list of trackers, including those that are no longer running
pub fn tracked(&self) -> Vec<Tracker<T>> {
self.trackers
.lock()
.iter()
.map(|(_, v)| v.tracker.clone())
.collect()
}
/// Returns a list of active trackers
pub fn running(&self) -> Vec<Tracker<T>> {
self.trackers
.lock()
.iter()
.filter_map(|(_, v)| {
if !v.tracker.is_complete() {
return Some(v.tracker.clone());
}
None
})
.collect()
}
}

View File

@ -13,6 +13,8 @@ use influxdb_iox_client::{
use structopt::StructOpt;
use thiserror::Error;
mod chunk;
#[derive(Debug, Error)]
pub enum Error {
#[error("Error creating database: {0}")]
@ -41,6 +43,9 @@ pub enum Error {
#[error("Error querying: {0}")]
Query(#[from] influxdb_iox_client::flight::Error),
#[error("Error in chunk subcommand: {0}")]
Chunk(#[from] chunk::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -63,11 +68,15 @@ struct Create {
mutable_buffer: Option<u64>,
}
/// Get list of databases, or return configuration of specific database
/// Get list of databases
#[derive(Debug, StructOpt)]
struct List {}
/// Return configuration of specific database
#[derive(Debug, StructOpt)]
struct Get {
/// If specified returns configuration of database
name: Option<String>,
/// The name of the database
name: String,
}
/// Write data into the specified database
@ -98,13 +107,15 @@ struct Query {
#[derive(Debug, StructOpt)]
enum Command {
Create(Create),
List(List),
Get(Get),
Write(Write),
Query(Query),
Chunk(chunk::Config),
}
pub async fn command(url: String, config: Config) -> Result<()> {
let connection = Builder::default().build(url).await?;
let connection = Builder::default().build(url.clone()).await?;
match config.command {
Command::Create(command) => {
@ -123,16 +134,16 @@ pub async fn command(url: String, config: Config) -> Result<()> {
.await?;
println!("Ok");
}
Command::List(_) => {
let mut client = management::Client::new(connection);
let databases = client.list_databases().await?;
println!("{}", databases.join(", "))
}
Command::Get(get) => {
let mut client = management::Client::new(connection);
if let Some(name) = get.name {
let database = client.get_database(name).await?;
// TOOD: Do something better than this
println!("{:#?}", database);
} else {
let databases = client.list_databases().await?;
println!("{}", databases.join(", "))
}
let database = client.get_database(get.name).await?;
// TOOD: Do something better than this
println!("{:#?}", database);
}
Command::Write(write) => {
let mut client = write::Client::new(connection);
@ -173,8 +184,12 @@ pub async fn command(url: String, config: Config) -> Result<()> {
}
let formatted_result = format.format(&batches)?;
println!("{}", formatted_result);
}
Command::Chunk(config) => {
chunk::command(url, config).await?;
}
}
Ok(())

View File

@ -0,0 +1,73 @@
//! This module implements the `chunk` CLI command
use data_types::chunk::ChunkSummary;
use generated_types::google::FieldViolation;
use influxdb_iox_client::{
connection::Builder,
management::{self, ListChunksError},
};
use std::convert::TryFrom;
use structopt::StructOpt;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("Error listing chunks: {0}")]
ListChunkError(#[from] ListChunksError),
#[error("Error interpreting server response: {0}")]
ConvertingResponse(#[from] FieldViolation),
#[error("Error rendering response as JSON: {0}")]
WritingJson(#[from] serde_json::Error),
#[error("Error connecting to IOx: {0}")]
ConnectionError(#[from] influxdb_iox_client::connection::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Manage IOx databases
#[derive(Debug, StructOpt)]
pub struct Config {
#[structopt(subcommand)]
command: Command,
}
/// List the chunks for the specified database in JSON format
#[derive(Debug, StructOpt)]
struct List {
/// The name of the database
db_name: String,
}
/// All possible subcommands for chunk
#[derive(Debug, StructOpt)]
enum Command {
List(List),
}
pub async fn command(url: String, config: Config) -> Result<()> {
let connection = Builder::default().build(url).await?;
match config.command {
Command::List(get) => {
let List { db_name } = get;
let mut client = management::Client::new(connection);
let chunks = client
.list_chunks(db_name)
.await
.map_err(Error::ListChunkError)?;
let chunks = chunks
.into_iter()
.map(ChunkSummary::try_from)
.collect::<Result<Vec<_>, FieldViolation>>()?;
serde_json::to_writer_pretty(std::io::stdout(), &chunks).map_err(Error::WritingJson)?;
}
}
Ok(())
}

View File

@ -2,7 +2,7 @@
use tracing_subscriber::{prelude::*, EnvFilter};
use super::server::{LogFormat, RunConfig};
use super::run::{Config, LogFormat};
/// Handles setting up logging levels
#[derive(Debug)]
@ -81,7 +81,7 @@ impl LoggingLevel {
/// Configures logging and tracing, based on the configuration
/// values, for the IOx server (the whole enchalada)
pub fn setup_logging(&self, config: &RunConfig) -> Option<opentelemetry_jaeger::Uninstall> {
pub fn setup_logging(&self, config: &Config) -> Option<opentelemetry_jaeger::Uninstall> {
// Copy anything from the config to the rust log environment
self.set_rust_log_if_needed(config.rust_log.clone());

341
src/commands/run.rs Normal file
View File

@ -0,0 +1,341 @@
//! Implementation of command line option for running server
use crate::commands::logging::LoggingLevel;
use crate::influxdb_ioxd;
use clap::arg_enum;
use std::{net::SocketAddr, net::ToSocketAddrs, path::PathBuf};
use structopt::StructOpt;
use thiserror::Error;
/// The default bind address for the HTTP API.
pub const DEFAULT_API_BIND_ADDR: &str = "127.0.0.1:8080";
/// The default bind address for the gRPC.
pub const DEFAULT_GRPC_BIND_ADDR: &str = "127.0.0.1:8082";
/// The AWS region to use for Amazon S3 based object storage if none is
/// specified.
pub const FALLBACK_AWS_REGION: &str = "us-east-1";
#[derive(Debug, Error)]
pub enum Error {
#[error("Run: {0}")]
ServerError(#[from] influxdb_ioxd::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, StructOpt)]
#[structopt(
name = "run",
about = "Runs in server mode",
long_about = "Run the IOx server.\n\nThe configuration options below can be \
set either with the command line flags or with the specified environment \
variable. If there is a file named '.env' in the current working directory, \
it is sourced before loading the configuration.
Configuration is loaded from the following sources (highest precedence first):
- command line arguments
- user set environment variables
- .env file contents
- pre-configured default values"
)]
pub struct Config {
/// This controls the IOx server logging level, as described in
/// https://crates.io/crates/env_logger.
///
/// Levels for different modules can be specified as well. For example
/// `debug,hyper::proto::h1=info` specifies debug logging for all modules
/// except for the `hyper::proto::h1' module which will only display info
/// level logging.
#[structopt(long = "--log", env = "RUST_LOG")]
pub rust_log: Option<String>,
/// Log message format. Can be one of:
///
/// "rust" (default)
/// "logfmt" (logfmt/Heroku style - https://brandur.org/logfmt)
#[structopt(long = "--log_format", env = "INFLUXDB_IOX_LOG_FORMAT")]
pub log_format: Option<LogFormat>,
/// This sets logging up with a pre-configured set of convenient log levels.
///
/// -v means 'info' log levels
/// -vv means 'verbose' log level (with the exception of some particularly
/// low level libraries)
///
/// This option is ignored if --log / RUST_LOG are set
#[structopt(
short = "-v",
long = "--verbose",
multiple = true,
takes_value = false,
parse(from_occurrences)
)]
pub verbose_count: u64,
/// The identifier for the server.
///
/// Used for writing to object storage and as an identifier that is added to
/// replicated writes, WAL segments and Chunks. Must be unique in a group of
/// connected or semi-connected IOx servers. Must be a number that can be
/// represented by a 32-bit unsigned integer.
#[structopt(long = "--writer-id", env = "INFLUXDB_IOX_ID")]
pub writer_id: Option<u32>,
/// The address on which IOx will serve HTTP API requests.
#[structopt(
long = "--api-bind",
env = "INFLUXDB_IOX_BIND_ADDR",
default_value = DEFAULT_API_BIND_ADDR,
parse(try_from_str = parse_socket_addr),
)]
pub http_bind_address: SocketAddr,
/// The address on which IOx will serve Storage gRPC API requests.
#[structopt(
long = "--grpc-bind",
env = "INFLUXDB_IOX_GRPC_BIND_ADDR",
default_value = DEFAULT_GRPC_BIND_ADDR,
parse(try_from_str = parse_socket_addr),
)]
pub grpc_bind_address: SocketAddr,
/// The location InfluxDB IOx will use to store files locally.
#[structopt(long = "--data-dir", env = "INFLUXDB_IOX_DB_DIR")]
pub database_directory: Option<PathBuf>,
#[structopt(
long = "--object-store",
env = "INFLUXDB_IOX_OBJECT_STORE",
possible_values = &ObjectStore::variants(),
case_insensitive = true,
long_help = r#"Which object storage to use. If not specified, defaults to memory.
Possible values (case insensitive):
* memory (default): Effectively no object persistence.
* file: Stores objects in the local filesystem. Must also set `--data-dir`.
* s3: Amazon S3. Must also set `--bucket`, `--aws-access-key-id`, `--aws-secret-access-key`, and
possibly `--aws-default-region`.
* google: Google Cloud Storage. Must also set `--bucket` and `--google-service-account`.
* azure: Microsoft Azure blob storage. Must also set `--bucket`, `--azure-storage-account`,
and `--azure-storage-access-key`.
"#,
)]
pub object_store: Option<ObjectStore>,
/// Name of the bucket to use for the object store. Must also set
/// `--object-store` to a cloud object storage to have any effect.
///
/// If using Google Cloud Storage for the object store, this item as well
/// as `--google-service-account` must be set.
///
/// If using S3 for the object store, must set this item as well
/// as `--aws-access-key-id` and `--aws-secret-access-key`. Can also set
/// `--aws-default-region` if not using the fallback region.
///
/// If using Azure for the object store, set this item to the name of a
/// container you've created in the associated storage account, under
/// Blob Service > Containers. Must also set `--azure-storage-account` and
/// `--azure-storage-access-key`.
#[structopt(long = "--bucket", env = "INFLUXDB_IOX_BUCKET")]
pub bucket: Option<String>,
/// When using Amazon S3 as the object store, set this to an access key that
/// has permission to read from and write to the specified S3 bucket.
///
/// Must also set `--object-store=s3`, `--bucket`, and
/// `--aws-secret-access-key`. Can also set `--aws-default-region` if not
/// using the fallback region.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--aws-access-key-id", env = "AWS_ACCESS_KEY_ID")]
pub aws_access_key_id: Option<String>,
/// When using Amazon S3 as the object store, set this to the secret access
/// key that goes with the specified access key ID.
///
/// Must also set `--object-store=s3`, `--bucket`, `--aws-access-key-id`.
/// Can also set `--aws-default-region` if not using the fallback region.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--aws-secret-access-key", env = "AWS_SECRET_ACCESS_KEY")]
pub aws_secret_access_key: Option<String>,
/// When using Amazon S3 as the object store, set this to the region
/// that goes with the specified bucket if different from the fallback
/// value.
///
/// Must also set `--object-store=s3`, `--bucket`, `--aws-access-key-id`,
/// and `--aws-secret-access-key`.
#[structopt(
long = "--aws-default-region",
env = "AWS_DEFAULT_REGION",
default_value = FALLBACK_AWS_REGION,
)]
pub aws_default_region: String,
/// When using Google Cloud Storage as the object store, set this to the
/// path to the JSON file that contains the Google credentials.
///
/// Must also set `--object-store=google` and `--bucket`.
#[structopt(long = "--google-service-account", env = "GOOGLE_SERVICE_ACCOUNT")]
pub google_service_account: Option<String>,
/// When using Microsoft Azure as the object store, set this to the
/// name you see when going to All Services > Storage accounts > [name].
///
/// Must also set `--object-store=azure`, `--bucket`, and
/// `--azure-storage-access-key`.
#[structopt(long = "--azure-storage-account", env = "AZURE_STORAGE_ACCOUNT")]
pub azure_storage_account: Option<String>,
/// When using Microsoft Azure as the object store, set this to one of the
/// Key values in the Storage account's Settings > Access keys.
///
/// Must also set `--object-store=azure`, `--bucket`, and
/// `--azure-storage-account`.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")]
pub azure_storage_access_key: Option<String>,
/// If set, Jaeger traces are emitted to this host
/// using the OpenTelemetry tracer.
///
/// NOTE: The OpenTelemetry agent CAN ONLY be
/// configured using environment variables. It CAN NOT be configured
/// using the command line at this time. Some useful variables:
///
/// * OTEL_SERVICE_NAME: emitter service name (iox by default)
/// * OTEL_EXPORTER_JAEGER_AGENT_HOST: hostname/address of the collector
/// * OTEL_EXPORTER_JAEGER_AGENT_PORT: listening port of the collector.
///
/// The entire list of variables can be found in
/// https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#jaeger-exporter
#[structopt(
long = "--oetl_exporter_jaeger_agent",
env = "OTEL_EXPORTER_JAEGER_AGENT_HOST"
)]
pub jaeger_host: Option<String>,
}
pub async fn command(logging_level: LoggingLevel, config: Config) -> Result<()> {
Ok(influxdb_ioxd::main(logging_level, config).await?)
}
fn parse_socket_addr(s: &str) -> std::io::Result<SocketAddr> {
let mut addrs = s.to_socket_addrs()?;
// when name resolution fails, to_socket_address returns a validation error
// so generally there is at least one result address, unless the resolver is
// drunk.
Ok(addrs
.next()
.expect("name resolution should return at least one address"))
}
arg_enum! {
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum ObjectStore {
Memory,
File,
S3,
Google,
Azure,
}
}
/// How to format output logging messages
#[derive(Debug, Clone, Copy)]
pub enum LogFormat {
/// Default formatted logging
///
/// Example:
/// ```
/// level=warn msg="NO PERSISTENCE: using memory for object storage" target="influxdb_iox::influxdb_ioxd"
/// ```
Rust,
/// Use the (somwhat pretentiously named) Heroku / logfmt formatted output
/// format
///
/// Example:
/// ```
/// Jan 31 13:19:39.059 WARN influxdb_iox::influxdb_ioxd: NO PERSISTENCE: using memory for object storage
/// ```
LogFmt,
}
impl Default for LogFormat {
fn default() -> Self {
Self::Rust
}
}
impl std::str::FromStr for LogFormat {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"rust" => Ok(Self::Rust),
"logfmt" => Ok(Self::LogFmt),
_ => Err(format!(
"Invalid log format '{}'. Valid options: rust, logfmt",
s
)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6};
fn to_vec(v: &[&str]) -> Vec<String> {
v.iter().map(|s| s.to_string()).collect()
}
#[test]
fn test_socketaddr() -> Result<(), clap::Error> {
let c = Config::from_iter_safe(
to_vec(&["server", "--api-bind", "127.0.0.1:1234"]).into_iter(),
)?;
assert_eq!(
c.http_bind_address,
SocketAddr::from(([127, 0, 0, 1], 1234))
);
let c = Config::from_iter_safe(
to_vec(&["server", "--api-bind", "localhost:1234"]).into_iter(),
)?;
// depending on where the test runs, localhost will either resolve to a ipv4 or
// an ipv6 addr.
match c.http_bind_address {
SocketAddr::V4(so) => {
assert_eq!(so, SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 1234))
}
SocketAddr::V6(so) => assert_eq!(
so,
SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), 1234, 0, 0)
),
};
assert_eq!(
Config::from_iter_safe(
to_vec(&["server", "--api-bind", "!@INv_a1d(ad0/resp_!"]).into_iter(),
)
.map_err(|e| e.kind)
.expect_err("must fail"),
clap::ErrorKind::ValueValidation
);
Ok(())
}
}

View File

@ -1,27 +1,14 @@
//! Implementation of command line option for manipulating and showing server
//! config
use crate::commands::logging::LoggingLevel;
use crate::influxdb_ioxd;
use clap::arg_enum;
use std::{net::SocketAddr, net::ToSocketAddrs, path::PathBuf};
use crate::commands::server_remote;
use structopt::StructOpt;
use thiserror::Error;
/// The default bind address for the HTTP API.
pub const DEFAULT_API_BIND_ADDR: &str = "127.0.0.1:8080";
/// The default bind address for the gRPC.
pub const DEFAULT_GRPC_BIND_ADDR: &str = "127.0.0.1:8082";
/// The AWS region to use for Amazon S3 based object storage if none is
/// specified.
pub const FALLBACK_AWS_REGION: &str = "us-east-1";
#[derive(Debug, Error)]
pub enum Error {
#[error("Server error: {0}")]
ServerError(#[from] influxdb_ioxd::Error),
#[error("Remote: {0}")]
RemoteError(#[from] server_remote::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -29,322 +16,11 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, StructOpt)]
#[structopt(name = "server", about = "IOx server commands")]
pub enum Config {
Run(RunConfig),
Remote(crate::commands::server_remote::Config),
}
#[derive(Debug, StructOpt)]
#[structopt(
name = "run",
about = "Runs in server mode",
long_about = "Run the IOx server.\n\nThe configuration options below can be \
set either with the command line flags or with the specified environment \
variable. If there is a file named '.env' in the current working directory, \
it is sourced before loading the configuration.
Configuration is loaded from the following sources (highest precedence first):
- command line arguments
- user set environment variables
- .env file contents
- pre-configured default values"
)]
pub struct RunConfig {
/// This controls the IOx server logging level, as described in
/// https://crates.io/crates/env_logger.
///
/// Levels for different modules can be specified as well. For example
/// `debug,hyper::proto::h1=info` specifies debug logging for all modules
/// except for the `hyper::proto::h1' module which will only display info
/// level logging.
#[structopt(long = "--log", env = "RUST_LOG")]
pub rust_log: Option<String>,
/// Log message format. Can be one of:
///
/// "rust" (default)
/// "logfmt" (logfmt/Heroku style - https://brandur.org/logfmt)
#[structopt(long = "--log_format", env = "INFLUXDB_IOX_LOG_FORMAT")]
pub log_format: Option<LogFormat>,
/// This sets logging up with a pre-configured set of convenient log levels.
///
/// -v means 'info' log levels
/// -vv means 'verbose' log level (with the exception of some particularly
/// low level libraries)
///
/// This option is ignored if --log / RUST_LOG are set
#[structopt(
short = "-v",
long = "--verbose",
multiple = true,
takes_value = false,
parse(from_occurrences)
)]
pub verbose_count: u64,
/// The identifier for the server.
///
/// Used for writing to object storage and as an identifier that is added to
/// replicated writes, WAL segments and Chunks. Must be unique in a group of
/// connected or semi-connected IOx servers. Must be a number that can be
/// represented by a 32-bit unsigned integer.
#[structopt(long = "--writer-id", env = "INFLUXDB_IOX_ID")]
pub writer_id: Option<u32>,
/// The address on which IOx will serve HTTP API requests.
#[structopt(
long = "--api-bind",
env = "INFLUXDB_IOX_BIND_ADDR",
default_value = DEFAULT_API_BIND_ADDR,
parse(try_from_str = parse_socket_addr),
)]
pub http_bind_address: SocketAddr,
/// The address on which IOx will serve Storage gRPC API requests.
#[structopt(
long = "--grpc-bind",
env = "INFLUXDB_IOX_GRPC_BIND_ADDR",
default_value = DEFAULT_GRPC_BIND_ADDR,
parse(try_from_str = parse_socket_addr),
)]
pub grpc_bind_address: SocketAddr,
/// The location InfluxDB IOx will use to store files locally.
#[structopt(long = "--data-dir", env = "INFLUXDB_IOX_DB_DIR")]
pub database_directory: Option<PathBuf>,
#[structopt(
long = "--object-store",
env = "INFLUXDB_IOX_OBJECT_STORE",
possible_values = &ObjectStore::variants(),
case_insensitive = true,
long_help = r#"Which object storage to use. If not specified, defaults to memory.
Possible values (case insensitive):
* memory (default): Effectively no object persistence.
* file: Stores objects in the local filesystem. Must also set `--data-dir`.
* s3: Amazon S3. Must also set `--bucket`, `--aws-access-key-id`, `--aws-secret-access-key`, and
possibly `--aws-default-region`.
* google: Google Cloud Storage. Must also set `--bucket` and `--google-service-account`.
* azure: Microsoft Azure blob storage. Must also set `--bucket`, `--azure-storage-account`,
and `--azure-storage-access-key`.
"#,
)]
pub object_store: Option<ObjectStore>,
/// Name of the bucket to use for the object store. Must also set
/// `--object-store` to a cloud object storage to have any effect.
///
/// If using Google Cloud Storage for the object store, this item as well
/// as `--google-service-account` must be set.
///
/// If using S3 for the object store, must set this item as well
/// as `--aws-access-key-id` and `--aws-secret-access-key`. Can also set
/// `--aws-default-region` if not using the fallback region.
///
/// If using Azure for the object store, set this item to the name of a
/// container you've created in the associated storage account, under
/// Blob Service > Containers. Must also set `--azure-storage-account` and
/// `--azure-storage-access-key`.
#[structopt(long = "--bucket", env = "INFLUXDB_IOX_BUCKET")]
pub bucket: Option<String>,
/// When using Amazon S3 as the object store, set this to an access key that
/// has permission to read from and write to the specified S3 bucket.
///
/// Must also set `--object-store=s3`, `--bucket`, and
/// `--aws-secret-access-key`. Can also set `--aws-default-region` if not
/// using the fallback region.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--aws-access-key-id", env = "AWS_ACCESS_KEY_ID")]
pub aws_access_key_id: Option<String>,
/// When using Amazon S3 as the object store, set this to the secret access
/// key that goes with the specified access key ID.
///
/// Must also set `--object-store=s3`, `--bucket`, `--aws-access-key-id`.
/// Can also set `--aws-default-region` if not using the fallback region.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--aws-secret-access-key", env = "AWS_SECRET_ACCESS_KEY")]
pub aws_secret_access_key: Option<String>,
/// When using Amazon S3 as the object store, set this to the region
/// that goes with the specified bucket if different from the fallback
/// value.
///
/// Must also set `--object-store=s3`, `--bucket`, `--aws-access-key-id`,
/// and `--aws-secret-access-key`.
#[structopt(
long = "--aws-default-region",
env = "AWS_DEFAULT_REGION",
default_value = FALLBACK_AWS_REGION,
)]
pub aws_default_region: String,
/// When using Google Cloud Storage as the object store, set this to the
/// path to the JSON file that contains the Google credentials.
///
/// Must also set `--object-store=google` and `--bucket`.
#[structopt(long = "--google-service-account", env = "GOOGLE_SERVICE_ACCOUNT")]
pub google_service_account: Option<String>,
/// When using Microsoft Azure as the object store, set this to the
/// name you see when going to All Services > Storage accounts > [name].
///
/// Must also set `--object-store=azure`, `--bucket`, and
/// `--azure-storage-access-key`.
#[structopt(long = "--azure-storage-account", env = "AZURE_STORAGE_ACCOUNT")]
pub azure_storage_account: Option<String>,
/// When using Microsoft Azure as the object store, set this to one of the
/// Key values in the Storage account's Settings > Access keys.
///
/// Must also set `--object-store=azure`, `--bucket`, and
/// `--azure-storage-account`.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")]
pub azure_storage_access_key: Option<String>,
/// If set, Jaeger traces are emitted to this host
/// using the OpenTelemetry tracer.
///
/// NOTE: The OpenTelemetry agent CAN ONLY be
/// configured using environment variables. It CAN NOT be configured
/// using the command line at this time. Some useful variables:
///
/// * OTEL_SERVICE_NAME: emitter service name (iox by default)
/// * OTEL_EXPORTER_JAEGER_AGENT_HOST: hostname/address of the collector
/// * OTEL_EXPORTER_JAEGER_AGENT_PORT: listening port of the collector.
///
/// The entire list of variables can be found in
/// https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#jaeger-exporter
#[structopt(
long = "--oetl_exporter_jaeger_agent",
env = "OTEL_EXPORTER_JAEGER_AGENT_HOST"
)]
pub jaeger_host: Option<String>,
}
pub async fn command(logging_level: LoggingLevel, config: Config) -> Result<()> {
pub async fn command(url: String, config: Config) -> Result<()> {
match config {
Config::Run(config) => Ok(influxdb_ioxd::main(logging_level, config).await?),
}
}
fn parse_socket_addr(s: &str) -> std::io::Result<SocketAddr> {
let mut addrs = s.to_socket_addrs()?;
// when name resolution fails, to_socket_address returns a validation error
// so generally there is at least one result address, unless the resolver is
// drunk.
Ok(addrs
.next()
.expect("name resolution should return at least one address"))
}
arg_enum! {
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum ObjectStore {
Memory,
File,
S3,
Google,
Azure,
}
}
/// How to format output logging messages
#[derive(Debug, Clone, Copy)]
pub enum LogFormat {
/// Default formatted logging
///
/// Example:
/// ```
/// level=warn msg="NO PERSISTENCE: using memory for object storage" target="influxdb_iox::influxdb_ioxd"
/// ```
Rust,
/// Use the (somwhat pretentiously named) Heroku / logfmt formatted output
/// format
///
/// Example:
/// ```
/// Jan 31 13:19:39.059 WARN influxdb_iox::influxdb_ioxd: NO PERSISTENCE: using memory for object storage
/// ```
LogFmt,
}
impl Default for LogFormat {
fn default() -> Self {
Self::Rust
}
}
impl std::str::FromStr for LogFormat {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"rust" => Ok(Self::Rust),
"logfmt" => Ok(Self::LogFmt),
_ => Err(format!(
"Invalid log format '{}'. Valid options: rust, logfmt",
s
)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6};
fn to_vec(v: &[&str]) -> Vec<String> {
v.iter().map(|s| s.to_string()).collect()
}
#[test]
fn test_socketaddr() -> Result<(), clap::Error> {
let c = RunConfig::from_iter_safe(
to_vec(&["server", "--api-bind", "127.0.0.1:1234"]).into_iter(),
)?;
assert_eq!(
c.http_bind_address,
SocketAddr::from(([127, 0, 0, 1], 1234))
);
let c = RunConfig::from_iter_safe(
to_vec(&["server", "--api-bind", "localhost:1234"]).into_iter(),
)?;
// depending on where the test runs, localhost will either resolve to a ipv4 or
// an ipv6 addr.
match c.http_bind_address {
SocketAddr::V4(so) => {
assert_eq!(so, SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 1234))
}
SocketAddr::V6(so) => assert_eq!(
so,
SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), 1234, 0, 0)
),
};
assert_eq!(
RunConfig::from_iter_safe(
to_vec(&["server", "--api-bind", "!@INv_a1d(ad0/resp_!"]).into_iter(),
)
.map_err(|e| e.kind)
.expect_err("must fail"),
clap::ErrorKind::ValueValidation
);
Ok(())
Config::Remote(config) => Ok(server_remote::command(url, config).await?),
}
}

View File

@ -0,0 +1,76 @@
use influxdb_iox_client::{connection::Builder, management};
use structopt::StructOpt;
use thiserror::Error;
use prettytable::{format, Cell, Row, Table};
#[derive(Debug, Error)]
pub enum Error {
#[error("Error connecting to IOx: {0}")]
ConnectionError(#[from] influxdb_iox_client::connection::Error),
#[error("Update remote error: {0}")]
UpdateError(#[from] management::UpdateRemoteError),
#[error("List remote error: {0}")]
ListError(#[from] management::ListRemotesError),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, StructOpt)]
#[structopt(
name = "remote",
about = "Manage configuration about other IOx servers"
)]
pub enum Config {
/// Set connection parameters for a remote IOx server.
Set { id: u32, connection_string: String },
/// Remove a reference to a remote IOx server.
Remove { id: u32 },
/// List configured remote IOx server.
List,
}
pub async fn command(url: String, config: Config) -> Result<()> {
let connection = Builder::default().build(url).await?;
match config {
Config::Set {
id,
connection_string,
} => {
let mut client = management::Client::new(connection);
client.update_remote(id, connection_string).await?;
}
Config::Remove { id } => {
let mut client = management::Client::new(connection);
client.delete_remote(id).await?;
}
Config::List => {
let mut client = management::Client::new(connection);
let remotes = client.list_remotes().await?;
if remotes.is_empty() {
println!("no remotes configured");
} else {
let mut table = Table::new();
table.set_format(*format::consts::FORMAT_NO_LINESEP_WITH_TITLE);
table.set_titles(Row::new(vec![
Cell::new("ID"),
Cell::new("Connection string"),
]));
for i in remotes {
table.add_row(Row::new(vec![
Cell::new(&format!("{}", i.id)),
Cell::new(&i.connection_string),
]));
}
print!("{}", table);
}
}
};
Ok(())
}

View File

@ -1,6 +1,6 @@
use crate::commands::{
logging::LoggingLevel,
server::{ObjectStore as ObjStoreOpt, RunConfig},
run::{Config, ObjectStore as ObjStoreOpt},
};
use hyper::Server;
use object_store::{
@ -73,7 +73,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
///
/// The logging_level passed in is the global setting (e.g. if -v or
/// -vv was passed in before 'server')
pub async fn main(logging_level: LoggingLevel, config: RunConfig) -> Result<()> {
pub async fn main(logging_level: LoggingLevel, config: Config) -> Result<()> {
// Handle the case if -v/-vv is specified both before and after the server
// command
let logging_level = logging_level.combine(LoggingLevel::new(config.verbose_count));
@ -130,7 +130,6 @@ pub async fn main(logging_level: LoggingLevel, config: RunConfig) -> Result<()>
info!(bind_address=?grpc_bind_addr, "gRPC server listening");
// Construct and start up HTTP server
let router_service = http::router_service(Arc::clone(&app_server));
let bind_addr = config.http_bind_address;
@ -142,8 +141,11 @@ pub async fn main(logging_level: LoggingLevel, config: RunConfig) -> Result<()>
let git_hash = option_env!("GIT_HASH").unwrap_or("UNKNOWN");
info!(git_hash, "InfluxDB IOx server ready");
// Wait for both the servers to complete
let (grpc_server, server) = futures::future::join(grpc_server, http_server).await;
// Get IOx background worker task
let app = app_server.background_worker();
// TODO: Fix shutdown handling (#827)
let (grpc_server, server, _) = futures::future::join3(grpc_server, http_server, app).await;
grpc_server.context(ServingRPC)?;
server.context(ServingHttp)?;
@ -153,10 +155,10 @@ pub async fn main(logging_level: LoggingLevel, config: RunConfig) -> Result<()>
Ok(())
}
impl TryFrom<&RunConfig> for ObjectStore {
impl TryFrom<&Config> for ObjectStore {
type Error = Error;
fn try_from(config: &RunConfig) -> Result<Self, Self::Error> {
fn try_from(config: &Config) -> Result<Self, Self::Error> {
match config.object_store {
Some(ObjStoreOpt::Memory) | None => {
Ok(Self::new_in_memory(object_store::memory::InMemory::new()))
@ -283,7 +285,7 @@ mod tests {
#[test]
fn default_object_store_is_memory() {
let config = RunConfig::from_iter_safe(&["server"]).unwrap();
let config = Config::from_iter_safe(&["server"]).unwrap();
let object_store = ObjectStore::try_from(&config).unwrap();
@ -295,7 +297,7 @@ mod tests {
#[test]
fn explicitly_set_object_store_to_memory() {
let config = RunConfig::from_iter_safe(&["server", "--object-store", "memory"]).unwrap();
let config = Config::from_iter_safe(&["server", "--object-store", "memory"]).unwrap();
let object_store = ObjectStore::try_from(&config).unwrap();
@ -307,7 +309,7 @@ mod tests {
#[test]
fn valid_s3_config() {
let config = RunConfig::from_iter_safe(&[
let config = Config::from_iter_safe(&[
"server",
"--object-store",
"s3",
@ -330,7 +332,7 @@ mod tests {
#[test]
fn s3_config_missing_params() {
let config = RunConfig::from_iter_safe(&["server", "--object-store", "s3"]).unwrap();
let config = Config::from_iter_safe(&["server", "--object-store", "s3"]).unwrap();
let err = ObjectStore::try_from(&config).unwrap_err().to_string();
@ -343,7 +345,7 @@ mod tests {
#[test]
fn valid_google_config() {
let config = RunConfig::from_iter_safe(&[
let config = Config::from_iter_safe(&[
"server",
"--object-store",
"google",
@ -364,7 +366,7 @@ mod tests {
#[test]
fn google_config_missing_params() {
let config = RunConfig::from_iter_safe(&["server", "--object-store", "google"]).unwrap();
let config = Config::from_iter_safe(&["server", "--object-store", "google"]).unwrap();
let err = ObjectStore::try_from(&config).unwrap_err().to_string();
@ -377,7 +379,7 @@ mod tests {
#[test]
fn valid_azure_config() {
let config = RunConfig::from_iter_safe(&[
let config = Config::from_iter_safe(&[
"server",
"--object-store",
"azure",
@ -400,7 +402,7 @@ mod tests {
#[test]
fn azure_config_missing_params() {
let config = RunConfig::from_iter_safe(&["server", "--object-store", "azure"]).unwrap();
let config = Config::from_iter_safe(&["server", "--object-store", "azure"]).unwrap();
let err = ObjectStore::try_from(&config).unwrap_err().to_string();
@ -415,7 +417,7 @@ mod tests {
fn valid_file_config() {
let root = TempDir::new().unwrap();
let config = RunConfig::from_iter_safe(&[
let config = Config::from_iter_safe(&[
"server",
"--object-store",
"file",
@ -434,7 +436,7 @@ mod tests {
#[test]
fn file_config_missing_params() {
let config = RunConfig::from_iter_safe(&["server", "--object-store", "file"]).unwrap();
let config = Config::from_iter_safe(&["server", "--object-store", "file"]).unwrap();
let err = ObjectStore::try_from(&config).unwrap_err().to_string();

View File

@ -11,6 +11,7 @@ use server::{ConnectionManager, Server};
pub mod error;
mod flight;
mod management;
mod operations;
mod storage;
mod testing;
mod write;
@ -53,7 +54,8 @@ where
.add_service(storage::make_server(Arc::clone(&server)))
.add_service(flight::make_server(Arc::clone(&server)))
.add_service(write::make_server(Arc::clone(&server)))
.add_service(management::make_server(server))
.add_service(management::make_server(Arc::clone(&server)))
.add_service(operations::make_server(server))
.serve_with_incoming(stream)
.await
.context(ServerError {})

View File

@ -1,10 +1,10 @@
use generated_types::google::{InternalError, NotFound, PreconditionViolation};
use tracing::error;
use server::Error;
/// map common `server::Error` errors to the appropriate tonic Status
pub fn default_server_error_handler(error: server::Error) -> tonic::Status {
use server::Error;
/// map common server errors to the appropriate tonic Status
pub fn default_error_handler(error: Error) -> tonic::Status {
match error {
Error::IdNotSet => PreconditionViolation {
category: "Writer ID".to_string(),
@ -24,3 +24,26 @@ pub fn default_error_handler(error: Error) -> tonic::Status {
}
}
}
/// map common `server::db::Error` errors to the appropriate tonic Status
pub fn default_db_error_handler(error: server::db::Error) -> tonic::Status {
use server::db::Error;
match error {
Error::DatabaseNotReadable {} => PreconditionViolation {
category: "database".to_string(),
subject: "influxdata.com/iox".to_string(),
description: "Cannot read from database: no mutable buffer configured".to_string(),
}
.into(),
Error::DatatbaseNotWriteable {} => PreconditionViolation {
category: "database".to_string(),
subject: "influxdata.com/iox".to_string(),
description: "Cannot write to database: no mutable buffer configured".to_string(),
}
.into(),
error => {
error!(?error, "Unexpected error");
InternalError {}.into()
}
}
}

View File

@ -6,7 +6,7 @@ use data_types::database_rules::DatabaseRules;
use data_types::DatabaseName;
use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt, NotFound};
use generated_types::influxdata::iox::management::v1::*;
use query::DatabaseStore;
use query::{Database, DatabaseStore};
use server::{ConnectionManager, Error, Server};
use tonic::{Request, Response, Status};
@ -14,7 +14,7 @@ struct ManagementService<M: ConnectionManager> {
server: Arc<Server<M>>,
}
use super::error::default_error_handler;
use super::error::{default_db_error_handler, default_server_error_handler};
#[tonic::async_trait]
impl<M> management_service_server::ManagementService for ManagementService<M>
@ -92,10 +92,51 @@ where
}
.into())
}
Err(e) => Err(default_error_handler(e)),
Err(e) => Err(default_server_error_handler(e)),
}
}
async fn list_chunks(
&self,
request: Request<ListChunksRequest>,
) -> Result<Response<ListChunksResponse>, Status> {
let db_name = DatabaseName::new(request.into_inner().db_name).field("db_name")?;
let db = match self.server.db(&db_name) {
Some(db) => db,
None => {
return Err(NotFound {
resource_type: "database".to_string(),
resource_name: db_name.to_string(),
..Default::default()
}
.into())
}
};
let chunk_summaries = match db.chunk_summaries() {
Ok(chunk_summaries) => chunk_summaries,
Err(e) => return Err(default_db_error_handler(e)),
};
let chunks: Vec<Chunk> = chunk_summaries
.into_iter()
.map(|summary| summary.into())
.collect();
Ok(Response::new(ListChunksResponse { chunks }))
}
async fn create_dummy_job(
&self,
request: Request<CreateDummyJobRequest>,
) -> Result<Response<CreateDummyJobResponse>, Status> {
let request = request.into_inner();
let slot = self.server.spawn_dummy_job(request.nanos);
let operation = Some(super::operations::encode_tracker(slot)?);
Ok(Response::new(CreateDummyJobResponse { operation }))
}
async fn list_remotes(
&self,
_: Request<ListRemotesRequest>,

View File

@ -0,0 +1,182 @@
use std::fmt::Debug;
use std::sync::Arc;
use bytes::BytesMut;
use prost::Message;
use tonic::Response;
use tracing::debug;
use data_types::job::Job;
use generated_types::google::FieldViolationExt;
use generated_types::{
google::{
longrunning::*,
protobuf::{Any, Empty},
rpc::Status,
FieldViolation, InternalError, NotFound,
},
influxdata::iox::management::v1 as management,
};
use server::{
tracker::{Tracker, TrackerId},
ConnectionManager, Server,
};
use std::convert::TryInto;
/// Implementation of the write service
struct OperationsService<M: ConnectionManager> {
server: Arc<Server<M>>,
}
pub fn encode_tracker(tracker: Tracker<Job>) -> Result<Operation, tonic::Status> {
let id = tracker.id();
let is_complete = tracker.is_complete();
let is_cancelled = tracker.is_cancelled();
let mut buffer = BytesMut::new();
management::OperationMetadata {
cpu_nanos: tracker.cpu_nanos() as _,
wall_nanos: tracker.wall_nanos() as _,
task_count: tracker.created_futures() as _,
pending_count: tracker.pending_futures() as _,
job: Some(tracker.metadata().clone().into()),
}
.encode(&mut buffer)
.map_err(|error| {
debug!(?error, "Unexpected error");
InternalError {}
})?;
let metadata = Any {
type_url: "type.googleapis.com/influxdata.iox.management.v1.OperationMetadata".to_string(),
value: buffer.freeze(),
};
let result = match (is_complete, is_cancelled) {
(true, true) => Some(operation::Result::Error(Status {
code: tonic::Code::Cancelled as _,
message: "Job cancelled".to_string(),
details: vec![],
})),
(true, false) => Some(operation::Result::Response(Any {
type_url: "type.googleapis.com/google.protobuf.Empty".to_string(),
value: Default::default(), // TODO: Verify this is correct
})),
_ => None,
};
Ok(Operation {
name: id.to_string(),
metadata: Some(metadata),
done: is_complete,
result,
})
}
fn get_tracker<M>(server: &Server<M>, tracker: String) -> Result<Tracker<Job>, tonic::Status>
where
M: ConnectionManager,
{
let tracker_id = tracker.parse::<TrackerId>().map_err(|e| FieldViolation {
field: "name".to_string(),
description: e.to_string(),
})?;
let tracker = server.get_job(tracker_id).ok_or(NotFound {
resource_type: "job".to_string(),
resource_name: tracker,
..Default::default()
})?;
Ok(tracker)
}
#[tonic::async_trait]
impl<M> operations_server::Operations for OperationsService<M>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
async fn list_operations(
&self,
_request: tonic::Request<ListOperationsRequest>,
) -> Result<tonic::Response<ListOperationsResponse>, tonic::Status> {
// TODO: Support pagination
let operations: Result<Vec<_>, _> = self
.server
.tracked_jobs()
.into_iter()
.map(encode_tracker)
.collect();
Ok(Response::new(ListOperationsResponse {
operations: operations?,
next_page_token: Default::default(),
}))
}
async fn get_operation(
&self,
request: tonic::Request<GetOperationRequest>,
) -> Result<tonic::Response<Operation>, tonic::Status> {
let request = request.into_inner();
let tracker = get_tracker(self.server.as_ref(), request.name)?;
Ok(Response::new(encode_tracker(tracker)?))
}
async fn delete_operation(
&self,
_: tonic::Request<DeleteOperationRequest>,
) -> Result<tonic::Response<Empty>, tonic::Status> {
Err(tonic::Status::unimplemented(
"IOx does not support operation deletion",
))
}
async fn cancel_operation(
&self,
request: tonic::Request<CancelOperationRequest>,
) -> Result<tonic::Response<Empty>, tonic::Status> {
let request = request.into_inner();
let tracker = get_tracker(self.server.as_ref(), request.name)?;
tracker.cancel();
Ok(Response::new(Empty {}))
}
async fn wait_operation(
&self,
request: tonic::Request<WaitOperationRequest>,
) -> Result<tonic::Response<Operation>, tonic::Status> {
// This should take into account the context deadline timeout
// Unfortunately these are currently stripped by tonic
// - https://github.com/hyperium/tonic/issues/75
let request = request.into_inner();
let tracker = get_tracker(self.server.as_ref(), request.name)?;
if let Some(timeout) = request.timeout {
let timeout = timeout.try_into().field("timeout")?;
// Timeout is not an error so suppress it
let _ = tokio::time::timeout(timeout, tracker.join()).await;
} else {
tracker.join().await;
}
Ok(Response::new(encode_tracker(tracker)?))
}
}
/// Instantiate the write service
pub fn make_server<M>(
server: Arc<Server<M>>,
) -> operations_server::OperationsServer<impl operations_server::Operations>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
operations_server::OperationsServer::new(OperationsService { server })
}

View File

@ -7,7 +7,7 @@ use std::fmt::Debug;
use tonic::Response;
use tracing::debug;
use super::error::default_error_handler;
use super::error::default_server_error_handler;
/// Implementation of the write service
struct WriteService<M: ConnectionManager> {
@ -25,7 +25,7 @@ where
) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
let request = request.into_inner();
let db_name = request.name;
let db_name = request.db_name;
let lp_data = request.lp_data;
let lp_chars = lp_data.len();
@ -42,7 +42,7 @@ where
self.server
.write_lines(&db_name, &lines)
.await
.map_err(default_error_handler)?;
.map_err(default_server_error_handler)?;
let lines_written = lp_line_count as u64;
Ok(Response::new(WriteResponse { lines_written }))

View File

@ -23,7 +23,9 @@ mod commands {
mod input;
pub mod logging;
pub mod meta;
pub mod run;
pub mod server;
pub mod server_remote;
pub mod stats;
pub mod writer;
}
@ -45,13 +47,13 @@ Examples:
influxdb_iox
# Display all server settings
influxdb_iox server --help
influxdb_iox run --help
# Run the InfluxDB IOx server with extra verbose logging
influxdb_iox -v
influxdb_iox run -v
# Run InfluxDB IOx with full debug logging specified with RUST_LOG
RUST_LOG=debug influxdb_iox
RUST_LOG=debug influxdb_iox run
# converts line protocol formatted data in temperature.lp to out.parquet
influxdb_iox convert temperature.lp out.parquet
@ -109,9 +111,10 @@ enum Command {
input: String,
},
Database(commands::database::Config),
Stats(commands::stats::Config),
// Clippy recommended boxing this variant because it's much larger than the others
Server(Box<commands::server::Config>),
Run(Box<commands::run::Config>),
Stats(commands::stats::Config),
Server(commands::server::Config),
Writer(commands::writer::Config),
}
@ -183,9 +186,16 @@ fn main() -> Result<(), std::io::Error> {
}
}
Command::Server(config) => {
logging_level.setup_basic_logging();
if let Err(e) = commands::server::command(host, config).await {
eprintln!("Server command failed: {}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Run(config) => {
// Note don't set up basic logging here, different logging rules apply in server
// mode
if let Err(e) = commands::server::command(logging_level, *config).await {
if let Err(e) = commands::run::command(logging_level, *config).await {
eprintln!("Server command failed: {}", e);
std::process::exit(ReturnCode::Failure as _)
}

View File

@ -225,7 +225,6 @@ impl TestServer {
let server_process = Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("server")
.arg("run")
// Can enable for debugging
//.arg("-vv")
@ -251,7 +250,6 @@ impl TestServer {
self.server_process.wait().unwrap();
self.server_process = Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("server")
.arg("run")
// Can enable for debugging
//.arg("-vv")

View File

@ -3,25 +3,17 @@ use std::num::NonZeroU32;
use generated_types::google::protobuf::Empty;
use generated_types::{google::protobuf::Duration, influxdata::iox::management::v1::*};
use influxdb_iox_client::management::{Client, CreateDatabaseError};
use test_helpers::assert_contains;
use crate::common::server_fixture::ServerFixture;
use super::util::rand_name;
use super::util::{create_readable_database, create_unreadable_database, rand_name};
#[tokio::test]
pub async fn test() {
async fn test_list_update_remotes() {
let server_fixture = ServerFixture::create_single_use().await;
let mut client = Client::new(server_fixture.grpc_channel());
test_list_update_remotes(&mut client).await;
test_set_get_writer_id(&mut client).await;
test_create_database_duplicate_name(&mut client).await;
test_create_database_invalid_name(&mut client).await;
test_list_databases(&mut client).await;
test_create_get_database(&mut client).await;
}
async fn test_list_update_remotes(client: &mut Client) {
const TEST_REMOTE_ID_1: u32 = 42;
const TEST_REMOTE_ADDR_1: &str = "1.2.3.4:1234";
const TEST_REMOTE_ID_2: u32 = 84;
@ -77,7 +69,11 @@ async fn test_list_update_remotes(client: &mut Client) {
assert_eq!(res[0].connection_string, TEST_REMOTE_ADDR_2_UPDATED);
}
async fn test_set_get_writer_id(client: &mut Client) {
#[tokio::test]
async fn test_set_get_writer_id() {
let server_fixture = ServerFixture::create_single_use().await;
let mut client = Client::new(server_fixture.grpc_channel());
const TEST_ID: u32 = 42;
client
@ -90,7 +86,11 @@ async fn test_set_get_writer_id(client: &mut Client) {
assert_eq!(got.get(), TEST_ID);
}
async fn test_create_database_duplicate_name(client: &mut Client) {
#[tokio::test]
async fn test_create_database_duplicate_name() {
let server_fixture = ServerFixture::create_shared().await;
let mut client = Client::new(server_fixture.grpc_channel());
let db_name = rand_name();
client
@ -115,7 +115,11 @@ async fn test_create_database_duplicate_name(client: &mut Client) {
))
}
async fn test_create_database_invalid_name(client: &mut Client) {
#[tokio::test]
async fn test_create_database_invalid_name() {
let server_fixture = ServerFixture::create_shared().await;
let mut client = Client::new(server_fixture.grpc_channel());
let err = client
.create_database(DatabaseRules {
name: "my_example\ndb".to_string(),
@ -127,7 +131,11 @@ async fn test_create_database_invalid_name(client: &mut Client) {
assert!(matches!(dbg!(err), CreateDatabaseError::InvalidArgument(_)));
}
async fn test_list_databases(client: &mut Client) {
#[tokio::test]
async fn test_list_databases() {
let server_fixture = ServerFixture::create_shared().await;
let mut client = Client::new(server_fixture.grpc_channel());
let name = rand_name();
client
.create_database(DatabaseRules {
@ -144,7 +152,11 @@ async fn test_list_databases(client: &mut Client) {
assert!(names.contains(&name));
}
async fn test_create_get_database(client: &mut Client) {
#[tokio::test]
async fn test_create_get_database() {
let server_fixture = ServerFixture::create_shared().await;
let mut client = Client::new(server_fixture.grpc_channel());
let db_name = rand_name();
// Specify everything to allow direct comparison between request and response
@ -191,3 +203,87 @@ async fn test_create_get_database(client: &mut Client) {
assert_eq!(response, rules);
}
#[tokio::test]
async fn test_chunk_get() {
use generated_types::influxdata::iox::management::v1::{Chunk, ChunkStorage};
let fixture = ServerFixture::create_shared().await;
let mut management_client = Client::new(fixture.grpc_channel());
let mut write_client = influxdb_iox_client::write::Client::new(fixture.grpc_channel());
let db_name = rand_name();
create_readable_database(&db_name, fixture.grpc_channel()).await;
let lp_lines = vec![
"cpu,region=west user=23.2 100",
"cpu,region=west user=21.0 150",
"disk,region=east bytes=99i 200",
];
write_client
.write(&db_name, lp_lines.join("\n"))
.await
.expect("write succeded");
let mut chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
// ensure the output order is consistent
chunks.sort_by(|c1, c2| c1.partition_key.cmp(&c2.partition_key));
let expected: Vec<Chunk> = vec![
Chunk {
partition_key: "cpu".into(),
id: 0,
storage: ChunkStorage::OpenMutableBuffer as i32,
estimated_bytes: 145,
},
Chunk {
partition_key: "disk".into(),
id: 0,
storage: ChunkStorage::OpenMutableBuffer as i32,
estimated_bytes: 107,
},
];
assert_eq!(
expected, chunks,
"expected:\n\n{:#?}\n\nactual:{:#?}",
expected, chunks
);
}
#[tokio::test]
async fn test_chunk_get_errors() {
let fixture = ServerFixture::create_shared().await;
let mut management_client = Client::new(fixture.grpc_channel());
let db_name = rand_name();
let err = management_client
.list_chunks(&db_name)
.await
.expect_err("no db had been created");
assert_contains!(
err.to_string(),
"Some requested entity was not found: Resource database"
);
create_unreadable_database(&db_name, fixture.grpc_channel()).await;
let err = management_client
.list_chunks(&db_name)
.await
.expect_err("db can't be read");
assert_contains!(
err.to_string(),
"Precondition violation influxdata.com/iox - database"
);
assert_contains!(
err.to_string(),
"Cannot read from database: no mutable buffer configured"
);
}

View File

@ -1,18 +1,15 @@
use assert_cmd::Command;
use predicates::prelude::*;
use test_helpers::make_temp_file;
use crate::common::server_fixture::ServerFixture;
use super::util::{create_readable_database, rand_name};
#[tokio::test]
pub async fn test() {
let server_fixture = ServerFixture::create_single_use().await;
async fn test_writer_id() {
let server_fixture = ServerFixture::create_shared().await;
let addr = server_fixture.grpc_base();
test_writer_id(addr).await;
test_create_database(addr).await;
}
async fn test_writer_id(addr: &str) {
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("writer")
@ -35,8 +32,12 @@ async fn test_writer_id(addr: &str) {
.stdout(predicate::str::contains("32"));
}
async fn test_create_database(addr: &str) {
let db = "management-cli-test";
#[tokio::test]
async fn test_create_database() {
let server_fixture = ServerFixture::create_shared().await;
let addr = server_fixture.grpc_base();
let db_name = rand_name();
let db = &db_name;
Command::cargo_bin("influxdb_iox")
.unwrap()
@ -63,7 +64,7 @@ async fn test_create_database(addr: &str) {
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("get")
.arg("list")
.arg("--host")
.arg(addr)
.assert()
@ -81,3 +82,138 @@ async fn test_create_database(addr: &str) {
.success()
.stdout(predicate::str::contains(format!("name: \"{}\"", db)));
}
#[tokio::test]
async fn test_get_chunks() {
let server_fixture = ServerFixture::create_shared().await;
let addr = server_fixture.grpc_base();
let db_name = rand_name();
create_readable_database(&db_name, server_fixture.grpc_channel()).await;
let lp_data = vec![
"cpu,region=west user=23.2 100",
"cpu,region=west user=21.0 150",
];
let lp_data_file = make_temp_file(lp_data.join("\n"));
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("write")
.arg(&db_name)
.arg(lp_data_file.as_ref())
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("2 Lines OK"));
let expected = r#"[
{
"partition_key": "cpu",
"id": 0,
"storage": "OpenMutableBuffer",
"estimated_bytes": 145
}
]"#;
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("chunk")
.arg("list")
.arg(&db_name)
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains(expected));
}
#[tokio::test]
async fn test_list_chunks_error() {
let server_fixture = ServerFixture::create_shared().await;
let addr = server_fixture.grpc_base();
let db_name = rand_name();
// note don't make the database, expect error
// list the chunks
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("chunk")
.arg("list")
.arg(&db_name)
.arg("--host")
.arg(addr)
.assert()
.failure()
.stderr(
predicate::str::contains("Some requested entity was not found: Resource database")
.and(predicate::str::contains(&db_name)),
);
}
#[tokio::test]
async fn test_remotes() {
let server_fixture = ServerFixture::create_single_use().await;
let addr = server_fixture.grpc_base();
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("server")
.arg("remote")
.arg("list")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("no remotes configured"));
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("server")
.arg("remote")
.arg("set")
.arg("1")
.arg("http://1.2.3.4:1234")
.arg("--host")
.arg(addr)
.assert()
.success();
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("server")
.arg("remote")
.arg("list")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("http://1.2.3.4:1234"));
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("server")
.arg("remote")
.arg("remove")
.arg("1")
.arg("--host")
.arg(addr)
.assert()
.success();
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("server")
.arg("remote")
.arg("list")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("no remotes configured"));
}

View File

@ -1,5 +1,8 @@
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use generated_types::google::protobuf::Empty;
use generated_types::influxdata::iox::management::v1::*;
/// Return a random string suitable for use as a database name
pub fn rand_name() -> String {
thread_rng()
@ -8,3 +11,51 @@ pub fn rand_name() -> String {
.map(char::from)
.collect()
}
/// given a channel to talk with the managment api, create a new
/// database with the specified name configured with a 10MB mutable
/// buffer, partitioned on table
pub async fn create_readable_database(
db_name: impl Into<String>,
channel: tonic::transport::Channel,
) {
let mut management_client = influxdb_iox_client::management::Client::new(channel);
let rules = DatabaseRules {
name: db_name.into(),
partition_template: Some(PartitionTemplate {
parts: vec![partition_template::Part {
part: Some(partition_template::part::Part::Table(Empty {})),
}],
}),
mutable_buffer_config: Some(MutableBufferConfig {
buffer_size: 10 * 1024 * 1024,
..Default::default()
}),
..Default::default()
};
management_client
.create_database(rules.clone())
.await
.expect("create database failed");
}
/// given a channel to talk with the managment api, create a new
/// database with no mutable buffer configured, no partitioning rules
pub async fn create_unreadable_database(
db_name: impl Into<String>,
channel: tonic::transport::Channel,
) {
let mut management_client = influxdb_iox_client::management::Client::new(channel);
let rules = DatabaseRules {
name: db_name.into(),
..Default::default()
};
management_client
.create_database(rules.clone())
.await
.expect("create database failed");
}

View File

@ -1,26 +1,17 @@
use influxdb_iox_client::management::{self, generated_types::DatabaseRules};
use influxdb_iox_client::write::{self, WriteError};
use test_helpers::assert_contains;
use super::util::rand_name;
use crate::common::server_fixture::ServerFixture;
use super::util::{create_readable_database, rand_name};
#[tokio::test]
async fn test_write() {
let fixture = ServerFixture::create_shared().await;
let mut management_client = management::Client::new(fixture.grpc_channel());
let mut write_client = write::Client::new(fixture.grpc_channel());
let db_name = rand_name();
management_client
.create_database(DatabaseRules {
name: db_name.clone(),
..Default::default()
})
.await
.expect("create database failed");
create_readable_database(&db_name, fixture.grpc_channel()).await;
let lp_lines = vec![
"cpu,region=west user=23.2 100",