Merge branch 'main' into alamb/update_state_machine
commit
dc152abb55
|
@ -185,7 +185,7 @@ jobs:
|
||||||
- cache_restore
|
- cache_restore
|
||||||
- run:
|
- run:
|
||||||
name: Cargo test
|
name: Cargo test
|
||||||
command: cargo test --workspace
|
command: cargo test --workspace --features=kafka
|
||||||
- cache_save
|
- cache_save
|
||||||
|
|
||||||
# end to end tests with Heappy (heap profiling enabled)
|
# end to end tests with Heappy (heap profiling enabled)
|
||||||
|
@ -240,7 +240,7 @@ jobs:
|
||||||
- cargo-lock-{{ checksum "Cargo.lock" }}
|
- cargo-lock-{{ checksum "Cargo.lock" }}
|
||||||
- run:
|
- run:
|
||||||
name: Prime Rust build cache
|
name: Prime Rust build cache
|
||||||
command: cargo build --package influxdb_iox --bin influxdb_iox --package iox_data_generator --bin iox_data_generator
|
command: cargo build --package influxdb_iox --bin influxdb_iox --package iox_data_generator --bin iox_data_generator --features=kafka
|
||||||
- save_cache:
|
- save_cache:
|
||||||
key: cargo-lock-{{ checksum "Cargo.lock" }}
|
key: cargo-lock-{{ checksum "Cargo.lock" }}
|
||||||
paths:
|
paths:
|
||||||
|
@ -277,8 +277,8 @@ jobs:
|
||||||
name: Build benches
|
name: Build benches
|
||||||
command: cargo test --workspace --benches --no-run
|
command: cargo test --workspace --benches --no-run
|
||||||
- run:
|
- run:
|
||||||
name: Build with object store + exporter support + HEAP profiling
|
name: Build with object store + exporter support + HEAP profiling + kafka
|
||||||
command: cargo build --no-default-features --features="aws,gcp,azure,heappy,pprof"
|
command: cargo build --no-default-features --features="aws,gcp,azure,heappy,pprof,kafka"
|
||||||
- cache_save
|
- cache_save
|
||||||
|
|
||||||
# Lint protobufs.
|
# Lint protobufs.
|
||||||
|
|
|
@ -2486,6 +2486,8 @@ dependencies = [
|
||||||
"dotenv",
|
"dotenv",
|
||||||
"futures",
|
"futures",
|
||||||
"futures-test",
|
"futures-test",
|
||||||
|
"hyper",
|
||||||
|
"hyper-tls",
|
||||||
"indexmap",
|
"indexmap",
|
||||||
"itertools",
|
"itertools",
|
||||||
"observability_deps",
|
"observability_deps",
|
||||||
|
|
|
@ -15,7 +15,7 @@ WORKDIR /influxdb_iox
|
||||||
|
|
||||||
ARG CARGO_INCREMENTAL=yes
|
ARG CARGO_INCREMENTAL=yes
|
||||||
ARG PROFILE=release
|
ARG PROFILE=release
|
||||||
ARG FEATURES=aws,gcp,azure,jemalloc_replacing_malloc
|
ARG FEATURES=aws,gcp,azure,jemalloc_replacing_malloc,kafka
|
||||||
ARG ROARING_ARCH="haswell"
|
ARG ROARING_ARCH="haswell"
|
||||||
ARG RUSTFLAGS=""
|
ARG RUSTFLAGS=""
|
||||||
ENV CARGO_INCREMENTAL=$CARGO_INCREMENTAL \
|
ENV CARGO_INCREMENTAL=$CARGO_INCREMENTAL \
|
||||||
|
|
|
@ -42,6 +42,7 @@ message OperationMetadata {
|
||||||
CompactObjectStoreChunks compact_object_store_chunks = 18;
|
CompactObjectStoreChunks compact_object_store_chunks = 18;
|
||||||
LoadReadBufferChunk load_read_buffer_chunk = 19;
|
LoadReadBufferChunk load_read_buffer_chunk = 19;
|
||||||
RebuildPreservedCatalog rebuild_preserved_catalog = 20;
|
RebuildPreservedCatalog rebuild_preserved_catalog = 20;
|
||||||
|
CompactObjectStorePartition compact_object_store_partition = 21;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,6 +111,18 @@ message CompactObjectStoreChunks {
|
||||||
repeated bytes chunks = 4;
|
repeated bytes chunks = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Compact OS chunks of a partition into a single chunk
|
||||||
|
message CompactObjectStorePartition {
|
||||||
|
// name of the database
|
||||||
|
string db_name = 1;
|
||||||
|
|
||||||
|
// partition key
|
||||||
|
string partition_key = 2;
|
||||||
|
|
||||||
|
// table name
|
||||||
|
string table_name = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// Split and write chunks to object store
|
// Split and write chunks to object store
|
||||||
message PersistChunks {
|
message PersistChunks {
|
||||||
|
|
|
@ -95,6 +95,10 @@ service ManagementService {
|
||||||
//
|
//
|
||||||
// Errors if the chunks are not compacted yet and not contiguous
|
// Errors if the chunks are not compacted yet and not contiguous
|
||||||
rpc CompactObjectStoreChunks(CompactObjectStoreChunksRequest) returns (CompactObjectStoreChunksResponse);
|
rpc CompactObjectStoreChunks(CompactObjectStoreChunksRequest) returns (CompactObjectStoreChunksResponse);
|
||||||
|
|
||||||
|
// Compact all object store chunks of a given partition
|
||||||
|
//
|
||||||
|
rpc CompactObjectStorePartition(CompactObjectStorePartitionRequest) returns (CompactObjectStorePartitionResponse);
|
||||||
}
|
}
|
||||||
|
|
||||||
message ListDatabasesRequest {
|
message ListDatabasesRequest {
|
||||||
|
@ -505,3 +509,19 @@ message CompactObjectStoreChunksResponse {
|
||||||
google.longrunning.Operation operation = 1;
|
google.longrunning.Operation operation = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Request to commpact all object store of a given partition
|
||||||
|
message CompactObjectStorePartitionRequest {
|
||||||
|
// the name of the database
|
||||||
|
string db_name = 1;
|
||||||
|
|
||||||
|
// the partition key
|
||||||
|
string partition_key = 2;
|
||||||
|
|
||||||
|
// the table name
|
||||||
|
string table_name = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message CompactObjectStorePartitionResponse {
|
||||||
|
// The operation that tracks the work for compacting object store chunks
|
||||||
|
google.longrunning.Operation operation = 1;
|
||||||
|
}
|
||||||
|
|
|
@ -20,6 +20,10 @@ impl management::operation_metadata::Job {
|
||||||
db_name,
|
db_name,
|
||||||
..
|
..
|
||||||
}) => db_name,
|
}) => db_name,
|
||||||
|
Self::CompactObjectStorePartition(management::CompactObjectStorePartition {
|
||||||
|
db_name,
|
||||||
|
..
|
||||||
|
}) => db_name,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,3 +124,7 @@ jemalloc_replacing_malloc = ["tikv-jemalloc-sys"]
|
||||||
# Implicit feature selected when running under `clippy --all-features` to accept mutable exclusive features during
|
# Implicit feature selected when running under `clippy --all-features` to accept mutable exclusive features during
|
||||||
# linting
|
# linting
|
||||||
clippy = []
|
clippy = []
|
||||||
|
|
||||||
|
# Enable the write buffer implemented with Kafka. Disabled by default to save build time when not
|
||||||
|
# working on Kafka write buffer-related code.
|
||||||
|
kafka = ["router/kafka", "server/kafka"]
|
||||||
|
|
|
@ -85,6 +85,19 @@ struct CompactObjectStoreChunks {
|
||||||
chunk_ids: Vec<Uuid>,
|
chunk_ids: Vec<Uuid>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compact all Object Store Chunks of a partition
|
||||||
|
#[derive(Debug, StructOpt)]
|
||||||
|
struct CompactObjectStorePartition {
|
||||||
|
/// The name of the database
|
||||||
|
db_name: String,
|
||||||
|
|
||||||
|
/// The partition key
|
||||||
|
partition_key: String,
|
||||||
|
|
||||||
|
/// The table name
|
||||||
|
table_name: String,
|
||||||
|
}
|
||||||
|
|
||||||
/// lists all chunks in this partition
|
/// lists all chunks in this partition
|
||||||
#[derive(Debug, StructOpt)]
|
#[derive(Debug, StructOpt)]
|
||||||
struct ListChunks {
|
struct ListChunks {
|
||||||
|
@ -175,6 +188,9 @@ enum Command {
|
||||||
/// Errors if the chunks are not yet compacted and not contiguous.
|
/// Errors if the chunks are not yet compacted and not contiguous.
|
||||||
CompactObjectStoreChunks(CompactObjectStoreChunks),
|
CompactObjectStoreChunks(CompactObjectStoreChunks),
|
||||||
|
|
||||||
|
/// Compact all object store chunks of a given partition
|
||||||
|
CompactObjectStorePartition(CompactObjectStorePartition),
|
||||||
|
|
||||||
/// Drop partition from memory and (if persisted) from object store.
|
/// Drop partition from memory and (if persisted) from object store.
|
||||||
Drop(DropPartition),
|
Drop(DropPartition),
|
||||||
|
|
||||||
|
@ -255,6 +271,19 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
||||||
|
|
||||||
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
|
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
|
||||||
}
|
}
|
||||||
|
Command::CompactObjectStorePartition(compact) => {
|
||||||
|
let CompactObjectStorePartition {
|
||||||
|
db_name,
|
||||||
|
partition_key,
|
||||||
|
table_name,
|
||||||
|
} = compact;
|
||||||
|
|
||||||
|
let operation = client
|
||||||
|
.compact_object_store_partition(db_name, table_name, partition_key)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
|
||||||
|
}
|
||||||
Command::Drop(drop_partition) => {
|
Command::Drop(drop_partition) => {
|
||||||
let DropPartition {
|
let DropPartition {
|
||||||
db_name,
|
db_name,
|
||||||
|
|
|
@ -535,6 +535,7 @@ impl management_service_server::ManagementService for ManagementService {
|
||||||
Ok(Response::new(DropPartitionResponse {}))
|
Ok(Response::new(DropPartitionResponse {}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compact all given object store chunks
|
||||||
async fn compact_object_store_chunks(
|
async fn compact_object_store_chunks(
|
||||||
&self,
|
&self,
|
||||||
request: Request<CompactObjectStoreChunksRequest>,
|
request: Request<CompactObjectStoreChunksRequest>,
|
||||||
|
@ -570,6 +571,36 @@ impl management_service_server::ManagementService for ManagementService {
|
||||||
operation,
|
operation,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Compact all object store chunks of the given partition
|
||||||
|
async fn compact_object_store_partition(
|
||||||
|
&self,
|
||||||
|
request: Request<CompactObjectStorePartitionRequest>,
|
||||||
|
) -> Result<Response<CompactObjectStorePartitionResponse>, Status> {
|
||||||
|
let CompactObjectStorePartitionRequest {
|
||||||
|
db_name,
|
||||||
|
partition_key,
|
||||||
|
table_name,
|
||||||
|
} = request.into_inner();
|
||||||
|
|
||||||
|
// Validate that the database name is legit
|
||||||
|
let db_name = DatabaseName::new(db_name).scope("db_name")?;
|
||||||
|
|
||||||
|
let db = self
|
||||||
|
.server
|
||||||
|
.db(&db_name)
|
||||||
|
.map_err(default_server_error_handler)?;
|
||||||
|
|
||||||
|
let tracker = db
|
||||||
|
.compact_object_store_partition(&table_name, &partition_key)
|
||||||
|
.map_err(default_db_error_handler)?;
|
||||||
|
|
||||||
|
let operation = Some(super::operations::encode_tracker(tracker)?);
|
||||||
|
|
||||||
|
Ok(Response::new(CompactObjectStorePartitionResponse {
|
||||||
|
operation,
|
||||||
|
}))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns [`DatabaseRules`] formated according to the `omit_defaults` flag. If `omit_defaults` is
|
/// Returns [`DatabaseRules`] formated according to the `omit_defaults` flag. If `omit_defaults` is
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
//! CLI handling for object store config (via CLI arguments and environment variables).
|
//! CLI handling for object store config (via CLI arguments and environment variables).
|
||||||
use std::{convert::TryFrom, fs, path::PathBuf, time::Duration};
|
use std::{convert::TryFrom, fs, num::NonZeroUsize, path::PathBuf, time::Duration};
|
||||||
|
|
||||||
use clap::arg_enum;
|
use clap::arg_enum;
|
||||||
use futures::TryStreamExt;
|
use futures::TryStreamExt;
|
||||||
|
@ -172,6 +172,14 @@ Possible values (case insensitive):
|
||||||
/// environments.
|
/// environments.
|
||||||
#[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")]
|
#[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")]
|
||||||
pub azure_storage_access_key: Option<String>,
|
pub azure_storage_access_key: Option<String>,
|
||||||
|
|
||||||
|
/// When using a network-based object store, limit the number of connection to this value.
|
||||||
|
#[structopt(
|
||||||
|
long = "--object-store-connection-limit",
|
||||||
|
env = "OBJECT_STORE_CONNECTION_LIMIT",
|
||||||
|
default_value = "16"
|
||||||
|
)]
|
||||||
|
pub object_store_connection_limit: NonZeroUsize,
|
||||||
}
|
}
|
||||||
|
|
||||||
arg_enum! {
|
arg_enum! {
|
||||||
|
@ -267,6 +275,7 @@ impl TryFrom<&ObjectStoreConfig> for ObjectStore {
|
||||||
bucket,
|
bucket,
|
||||||
endpoint,
|
endpoint,
|
||||||
session_token,
|
session_token,
|
||||||
|
config.object_store_connection_limit,
|
||||||
)
|
)
|
||||||
.context(InvalidS3Config)
|
.context(InvalidS3Config)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,9 +17,8 @@ use uuid::Uuid;
|
||||||
use crate::{
|
use crate::{
|
||||||
common::server_fixture::{ServerFixture, ServerType},
|
common::server_fixture::{ServerFixture, ServerType},
|
||||||
end_to_end_cases::scenario::{
|
end_to_end_cases::scenario::{
|
||||||
collect_query, create_readable_database, data_dir, db_data_dir, rand_name,
|
create_readable_database, data_dir, db_data_dir, rand_name, wait_for_database_initialized,
|
||||||
wait_for_database_initialized, wait_for_exact_chunk_states,
|
wait_for_exact_chunk_states, wait_for_operations_to_complete,
|
||||||
wait_for_operations_to_complete,
|
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -234,13 +233,14 @@ async fn migrate_table_files_from_one_server_to_another() {
|
||||||
wait_for_database_initialized(&fixture, &db_name, Duration::from_secs(5)).await;
|
wait_for_database_initialized(&fixture, &db_name, Duration::from_secs(5)).await;
|
||||||
|
|
||||||
// Now the data shoudl be available for the_table
|
// Now the data shoudl be available for the_table
|
||||||
let query_results = flight_client
|
let batches = flight_client
|
||||||
.perform_query(&db_name, sql_query)
|
.perform_query(&db_name, sql_query)
|
||||||
.await
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.collect()
|
||||||
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let batches = collect_query(query_results).await;
|
|
||||||
|
|
||||||
let expected = vec![
|
let expected = vec![
|
||||||
"+-----------------+",
|
"+-----------------+",
|
||||||
"| COUNT(UInt8(1)) |",
|
"| COUNT(UInt8(1)) |",
|
||||||
|
|
|
@ -52,7 +52,7 @@ async fn test_delete_on_database() {
|
||||||
.perform_query(db_name.clone(), "select * from cpu")
|
.perform_query(db_name.clone(), "select * from cpu")
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let batches = query_results.to_batches().await.unwrap();
|
let batches = query_results.collect().await.unwrap();
|
||||||
let expected = [
|
let expected = [
|
||||||
"+--------+--------------------------------+------+",
|
"+--------+--------------------------------+------+",
|
||||||
"| region | time | user |",
|
"| region | time | user |",
|
||||||
|
@ -86,7 +86,7 @@ async fn test_delete_on_database() {
|
||||||
.perform_query(db_name.clone(), "select * from cpu")
|
.perform_query(db_name.clone(), "select * from cpu")
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let batches = query_results.to_batches().await.unwrap();
|
let batches = query_results.collect().await.unwrap();
|
||||||
let expected = [
|
let expected = [
|
||||||
"+--------+--------------------------------+------+",
|
"+--------+--------------------------------+------+",
|
||||||
"| region | time | user |",
|
"| region | time | user |",
|
||||||
|
@ -104,7 +104,7 @@ async fn test_delete_on_database() {
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let batches = query_results.to_batches().await.unwrap();
|
let batches = query_results.collect().await.unwrap();
|
||||||
// result should be as above
|
// result should be as above
|
||||||
assert_batches_sorted_eq!(&expected, &batches);
|
assert_batches_sorted_eq!(&expected, &batches);
|
||||||
|
|
||||||
|
@ -113,7 +113,7 @@ async fn test_delete_on_database() {
|
||||||
.perform_query(db_name.clone(), "select * from cpu where user!=21")
|
.perform_query(db_name.clone(), "select * from cpu where user!=21")
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let batches = query_results.to_batches().await.unwrap();
|
let batches = query_results.collect().await.unwrap();
|
||||||
// result should be nothing
|
// result should be nothing
|
||||||
let expected = ["++", "++"];
|
let expected = ["++", "++"];
|
||||||
assert_batches_sorted_eq!(&expected, &batches);
|
assert_batches_sorted_eq!(&expected, &batches);
|
||||||
|
@ -135,7 +135,7 @@ async fn test_delete_on_database() {
|
||||||
.perform_query(db_name.clone(), "select * from cpu")
|
.perform_query(db_name.clone(), "select * from cpu")
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let batches = query_results.to_batches().await.unwrap();
|
let batches = query_results.collect().await.unwrap();
|
||||||
let cpu_expected = [
|
let cpu_expected = [
|
||||||
"+--------+--------------------------------+------+",
|
"+--------+--------------------------------+------+",
|
||||||
"| region | time | user |",
|
"| region | time | user |",
|
||||||
|
@ -149,7 +149,7 @@ async fn test_delete_on_database() {
|
||||||
.perform_query(db_name.clone(), "select * from disk")
|
.perform_query(db_name.clone(), "select * from disk")
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let batches = query_results.to_batches().await.unwrap();
|
let batches = query_results.collect().await.unwrap();
|
||||||
let disk_expected = [
|
let disk_expected = [
|
||||||
"+-------+--------+--------------------------------+",
|
"+-------+--------+--------------------------------+",
|
||||||
"| bytes | region | time |",
|
"| bytes | region | time |",
|
||||||
|
|
|
@ -129,7 +129,16 @@ async fn assert_set_get_server_id(server_fixture: ServerFixture) {
|
||||||
let got = client.get_server_id().await.expect("get ID failed");
|
let got = client.get_server_id().await.expect("get ID failed");
|
||||||
assert_eq!(got, Some(test_id));
|
assert_eq!(got, Some(test_id));
|
||||||
|
|
||||||
// setting server ID a second time should fail
|
// setting server ID to same ID should be OK
|
||||||
|
client
|
||||||
|
.update_server_id(test_id)
|
||||||
|
.await
|
||||||
|
.expect("set ID again failed");
|
||||||
|
|
||||||
|
let got = client.get_server_id().await.expect("get ID failed");
|
||||||
|
assert_eq!(got, Some(test_id));
|
||||||
|
|
||||||
|
// setting server ID to a different ID should fail
|
||||||
let result = client
|
let result = client
|
||||||
.update_server_id(NonZeroU32::try_from(13).unwrap())
|
.update_server_id(NonZeroU32::try_from(13).unwrap())
|
||||||
.await;
|
.await;
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use super::scenario::{collect_query, create_readable_database, rand_name, Scenario};
|
use super::scenario::{create_readable_database, rand_name, Scenario};
|
||||||
use crate::common::server_fixture::{ServerFixture, ServerType};
|
use crate::common::server_fixture::{ServerFixture, ServerType};
|
||||||
use arrow_util::assert_batches_sorted_eq;
|
use arrow_util::assert_batches_sorted_eq;
|
||||||
|
|
||||||
|
@ -20,13 +20,14 @@ pub async fn test() {
|
||||||
// This does nothing except test the client handshake implementation.
|
// This does nothing except test the client handshake implementation.
|
||||||
client.handshake().await.unwrap();
|
client.handshake().await.unwrap();
|
||||||
|
|
||||||
let query_results = client
|
let batches = client
|
||||||
.perform_query(scenario.database_name(), sql_query)
|
.perform_query(scenario.database_name(), sql_query)
|
||||||
.await
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.collect()
|
||||||
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let batches = collect_query(query_results).await;
|
|
||||||
|
|
||||||
let expected_read_data: Vec<_> = expected_read_data.iter().map(|s| s.as_str()).collect();
|
let expected_read_data: Vec<_> = expected_read_data.iter().map(|s| s.as_str()).collect();
|
||||||
assert_batches_sorted_eq!(expected_read_data, &batches);
|
assert_batches_sorted_eq!(expected_read_data, &batches);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
use crate::common::server_fixture::{ServerFixture, ServerType};
|
||||||
|
use influxdb_iox_client::management::generated_types::*;
|
||||||
|
use std::time::Instant;
|
||||||
|
use test_helpers::assert_contains;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_create_database_invalid_kafka() {
|
||||||
|
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
|
||||||
|
let mut client = server_fixture.management_client();
|
||||||
|
|
||||||
|
let rules = DatabaseRules {
|
||||||
|
name: "db_with_bad_kafka_address".into(),
|
||||||
|
write_buffer_connection: Some(WriteBufferConnection {
|
||||||
|
r#type: "kafka".into(),
|
||||||
|
connection: "i_am_not_a_kafka_server:1234".into(),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let start = Instant::now();
|
||||||
|
let err = client
|
||||||
|
.create_database(rules)
|
||||||
|
.await
|
||||||
|
.expect_err("expected request to fail");
|
||||||
|
|
||||||
|
println!("Failed after {:?}", Instant::now() - start);
|
||||||
|
|
||||||
|
// expect that this error has a useful error related to kafka (not "timeout")
|
||||||
|
assert_contains!(
|
||||||
|
err.to_string(),
|
||||||
|
"error creating write buffer: Meta data fetch error: BrokerTransportFailure"
|
||||||
|
);
|
||||||
|
}
|
|
@ -1,3 +1,14 @@
|
||||||
|
use crate::{
|
||||||
|
common::server_fixture::{ServerFixture, ServerType, TestConfig, DEFAULT_SERVER_ID},
|
||||||
|
end_to_end_cases::{
|
||||||
|
management_cli::setup_load_and_persist_two_partition_chunks,
|
||||||
|
scenario::{
|
||||||
|
create_readable_database, create_two_partition_database, create_unreadable_database,
|
||||||
|
fixture_broken_catalog, rand_name, wait_for_exact_chunk_states, DatabaseBuilder,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
use bytes::Bytes;
|
||||||
use data_types::chunk_metadata::ChunkId;
|
use data_types::chunk_metadata::ChunkId;
|
||||||
use generated_types::google::protobuf::{Duration, Empty};
|
use generated_types::google::protobuf::{Duration, Empty};
|
||||||
use influxdb_iox_client::{
|
use influxdb_iox_client::{
|
||||||
|
@ -7,22 +18,9 @@ use influxdb_iox_client::{
|
||||||
generated_types::{database_status::DatabaseState, operation_metadata::Job, *},
|
generated_types::{database_status::DatabaseState, operation_metadata::Job, *},
|
||||||
Client,
|
Client,
|
||||||
},
|
},
|
||||||
router::generated_types::WriteBufferConnection,
|
|
||||||
};
|
};
|
||||||
use std::{fs::set_permissions, num::NonZeroU32, os::unix::fs::PermissionsExt};
|
use std::{fs::set_permissions, num::NonZeroU32, os::unix::fs::PermissionsExt, time::Instant};
|
||||||
use test_helpers::{assert_contains, assert_error};
|
use test_helpers::{assert_contains, assert_error};
|
||||||
|
|
||||||
use super::scenario::{
|
|
||||||
create_readable_database, create_two_partition_database, create_unreadable_database, rand_name,
|
|
||||||
};
|
|
||||||
use crate::common::server_fixture::{TestConfig, DEFAULT_SERVER_ID};
|
|
||||||
use crate::{
|
|
||||||
common::server_fixture::{ServerFixture, ServerType},
|
|
||||||
end_to_end_cases::scenario::{
|
|
||||||
fixture_broken_catalog, wait_for_exact_chunk_states, DatabaseBuilder,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
use std::time::Instant;
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -85,36 +83,6 @@ async fn test_create_database_invalid_name() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_create_database_invalid_kafka() {
|
|
||||||
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
|
|
||||||
let mut client = server_fixture.management_client();
|
|
||||||
|
|
||||||
let rules = DatabaseRules {
|
|
||||||
name: "db_with_bad_kafka_address".into(),
|
|
||||||
write_buffer_connection: Some(WriteBufferConnection {
|
|
||||||
r#type: "kafka".into(),
|
|
||||||
connection: "i_am_not_a_kafka_server:1234".into(),
|
|
||||||
..Default::default()
|
|
||||||
}),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
let start = Instant::now();
|
|
||||||
let err = client
|
|
||||||
.create_database(rules)
|
|
||||||
.await
|
|
||||||
.expect_err("expected request to fail");
|
|
||||||
|
|
||||||
println!("Failed after {:?}", Instant::now() - start);
|
|
||||||
|
|
||||||
// expect that this error has a useful error related to kafka (not "timeout")
|
|
||||||
assert_contains!(
|
|
||||||
err.to_string(),
|
|
||||||
"error creating write buffer: Meta data fetch error: BrokerTransportFailure"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_list_databases() {
|
async fn test_list_databases() {
|
||||||
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
|
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
|
||||||
|
@ -1777,143 +1745,24 @@ async fn test_persist_partition_error() {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_compact_os_chunks() {
|
async fn test_compact_os_chunks() {
|
||||||
use data_types::chunk_metadata::ChunkStorage;
|
// Make 2 persisted chunks for a partition
|
||||||
|
let (fixture, db_name, _addr, chunk_ids) = setup_load_and_persist_two_partition_chunks().await;
|
||||||
let fixture = ServerFixture::create_shared(ServerType::Database).await;
|
assert!(chunk_ids.len() > 1);
|
||||||
let mut write_client = fixture.write_client();
|
|
||||||
let mut management_client = fixture.management_client();
|
let mut management_client = fixture.management_client();
|
||||||
let mut operations_client = fixture.operations_client();
|
let mut operations_client = fixture.operations_client();
|
||||||
|
|
||||||
let db_name = rand_name();
|
let c_ids: Vec<Bytes> = chunk_ids
|
||||||
DatabaseBuilder::new(db_name.clone())
|
.iter()
|
||||||
.persist(true)
|
.map(|id| {
|
||||||
.persist_age_threshold_seconds(1_000)
|
let id_uuid = Uuid::parse_str(id).unwrap();
|
||||||
.late_arrive_window_seconds(1)
|
id_uuid.as_bytes().to_vec().into()
|
||||||
.build(fixture.grpc_channel())
|
})
|
||||||
.await;
|
.collect();
|
||||||
|
|
||||||
// Chunk 1
|
// Compact all 2 OS chunks of the partition
|
||||||
let lp_lines = vec!["cpu,tag1=cupcakes bar=1 10", "cpu,tag1=cookies bar=2 10"];
|
// note that both partition and table_name are "cpu" in the setup
|
||||||
|
|
||||||
let num_lines_written = write_client
|
|
||||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
|
||||||
.await
|
|
||||||
.expect("write succeded");
|
|
||||||
assert_eq!(num_lines_written, 2);
|
|
||||||
|
|
||||||
wait_for_exact_chunk_states(
|
|
||||||
&fixture,
|
|
||||||
&db_name,
|
|
||||||
vec![ChunkStorage::OpenMutableBuffer],
|
|
||||||
std::time::Duration::from_secs(5),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let chunks = management_client
|
|
||||||
.list_chunks(&db_name)
|
|
||||||
.await
|
|
||||||
.expect("listing chunks");
|
|
||||||
assert_eq!(chunks.len(), 1);
|
|
||||||
let partition_key = &chunks[0].partition_key;
|
|
||||||
|
|
||||||
management_client
|
|
||||||
.persist_partition(&db_name, "cpu", &partition_key[..], true)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let chunks = management_client
|
|
||||||
.list_chunks(&db_name)
|
|
||||||
.await
|
|
||||||
.expect("listing chunks");
|
|
||||||
assert_eq!(chunks.len(), 1);
|
|
||||||
assert_eq!(
|
|
||||||
chunks[0].storage,
|
|
||||||
generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore
|
|
||||||
as i32
|
|
||||||
);
|
|
||||||
|
|
||||||
// chunk 2
|
|
||||||
let lp_lines = vec![
|
|
||||||
"cpu,tag1=cookies bar=2 20",
|
|
||||||
"cpu,tag1=cookies bar=3 30", // duplicate
|
|
||||||
"cpu,tag1=cupcakes bar=2 20",
|
|
||||||
];
|
|
||||||
|
|
||||||
let num_lines_written = write_client
|
|
||||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
|
||||||
.await
|
|
||||||
.expect("write succeded");
|
|
||||||
assert_eq!(num_lines_written, 3);
|
|
||||||
|
|
||||||
let chunks = management_client
|
|
||||||
.list_chunks(&db_name)
|
|
||||||
.await
|
|
||||||
.expect("listing chunks");
|
|
||||||
assert_eq!(chunks.len(), 2);
|
|
||||||
let partition_key = &chunks[0].partition_key;
|
|
||||||
|
|
||||||
management_client
|
|
||||||
.persist_partition(&db_name, "cpu", &partition_key[..], true)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let mut chunks = management_client
|
|
||||||
.list_chunks(&db_name)
|
|
||||||
.await
|
|
||||||
.expect("listing chunks");
|
|
||||||
// ensure chunk in deterministic order
|
|
||||||
chunks.sort_by(|c1, c2| c1.id.cmp(&c2.id));
|
|
||||||
assert_eq!(chunks.len(), 2);
|
|
||||||
assert_eq!(
|
|
||||||
chunks[0].storage,
|
|
||||||
generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore
|
|
||||||
as i32
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
chunks[1].storage,
|
|
||||||
generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore
|
|
||||||
as i32
|
|
||||||
);
|
|
||||||
|
|
||||||
let chunk_id_1 = chunks[0].id.clone();
|
|
||||||
let partition_key_1 = &chunks[0].partition_key;
|
|
||||||
let chunk_id_2 = chunks[1].id.clone();
|
|
||||||
let partition_key_2 = &chunks[1].partition_key;
|
|
||||||
assert_eq!(partition_key_1, partition_key_2);
|
|
||||||
|
|
||||||
// unload both RUBs
|
|
||||||
management_client
|
|
||||||
.unload_partition_chunk(&db_name, "cpu", &partition_key_1[..], chunk_id_1.clone())
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
management_client
|
|
||||||
.unload_partition_chunk(&db_name, "cpu", &partition_key_2[..], chunk_id_2.clone())
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// verify chunk status again
|
|
||||||
let chunks = management_client
|
|
||||||
.list_chunks(&db_name)
|
|
||||||
.await
|
|
||||||
.expect("listing chunks");
|
|
||||||
assert_eq!(chunks.len(), 2);
|
|
||||||
assert_eq!(
|
|
||||||
chunks[0].storage,
|
|
||||||
generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
chunks[1].storage,
|
|
||||||
generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32
|
|
||||||
);
|
|
||||||
|
|
||||||
// Compact 2 chunks
|
|
||||||
let iox_operation = management_client
|
let iox_operation = management_client
|
||||||
.compact_object_store_chunks(
|
.compact_object_store_chunks(&db_name, "cpu", "cpu", c_ids.clone())
|
||||||
&db_name,
|
|
||||||
"cpu",
|
|
||||||
&partition_key_1[..],
|
|
||||||
vec![chunk_id_1.clone(), chunk_id_2.clone()],
|
|
||||||
)
|
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
@ -1923,7 +1772,7 @@ async fn test_compact_os_chunks() {
|
||||||
match iox_operation.metadata.job {
|
match iox_operation.metadata.job {
|
||||||
Some(Job::CompactObjectStoreChunks(job)) => {
|
Some(Job::CompactObjectStoreChunks(job)) => {
|
||||||
assert_eq!(&job.db_name, &db_name);
|
assert_eq!(&job.db_name, &db_name);
|
||||||
assert_eq!(job.partition_key.as_str(), partition_key_1);
|
assert_eq!(job.partition_key.as_str(), "cpu");
|
||||||
assert_eq!(job.table_name.as_str(), "cpu");
|
assert_eq!(job.table_name.as_str(), "cpu");
|
||||||
}
|
}
|
||||||
job => panic!("unexpected job returned {:#?}", job),
|
job => panic!("unexpected job returned {:#?}", job),
|
||||||
|
@ -1946,6 +1795,55 @@ async fn test_compact_os_chunks() {
|
||||||
generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32
|
generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32
|
||||||
);
|
);
|
||||||
let new_chunk_id = chunks[0].id.clone();
|
let new_chunk_id = chunks[0].id.clone();
|
||||||
assert_ne!(new_chunk_id, chunk_id_1);
|
assert_ne!(new_chunk_id, c_ids[0]);
|
||||||
assert_ne!(new_chunk_id, chunk_id_2);
|
assert_ne!(new_chunk_id, c_ids[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_compact_os_partition() {
|
||||||
|
// Make 2 persisted chunks for a partition
|
||||||
|
let (fixture, db_name, _addr, chunk_ids) = setup_load_and_persist_two_partition_chunks().await;
|
||||||
|
let mut management_client = fixture.management_client();
|
||||||
|
let mut operations_client = fixture.operations_client();
|
||||||
|
|
||||||
|
// Compact all 2 OS chunks of the partition
|
||||||
|
// note that both partition and table_name are "cpu" in the setup
|
||||||
|
let iox_operation = management_client
|
||||||
|
.compact_object_store_partition(&db_name, "cpu", "cpu")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let operation_id = iox_operation.operation.id();
|
||||||
|
|
||||||
|
// ensure we got a legit job description back
|
||||||
|
// note that since compact_object_store_partition invokes compact_object_store_chunks,
|
||||||
|
// its job is recorded as CompactObjectStoreChunks
|
||||||
|
match iox_operation.metadata.job {
|
||||||
|
Some(Job::CompactObjectStoreChunks(job)) => {
|
||||||
|
assert_eq!(&job.db_name, &db_name);
|
||||||
|
assert_eq!(job.partition_key.as_str(), "cpu");
|
||||||
|
assert_eq!(job.table_name.as_str(), "cpu");
|
||||||
|
}
|
||||||
|
job => panic!("unexpected job returned {:#?}", job),
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for the job to be done
|
||||||
|
operations_client
|
||||||
|
.wait_operation(operation_id, Some(std::time::Duration::from_secs(1)))
|
||||||
|
.await
|
||||||
|
.expect("failed to wait operation");
|
||||||
|
|
||||||
|
// verify chunks after compaction
|
||||||
|
let chunks = management_client
|
||||||
|
.list_chunks(&db_name)
|
||||||
|
.await
|
||||||
|
.expect("listing chunks");
|
||||||
|
assert_eq!(chunks.len(), 1);
|
||||||
|
assert_eq!(
|
||||||
|
chunks[0].storage,
|
||||||
|
generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32
|
||||||
|
);
|
||||||
|
let new_chunk_id = chunks[0].id.clone();
|
||||||
|
assert_ne!(new_chunk_id, chunk_ids[0]);
|
||||||
|
assert_ne!(new_chunk_id, chunk_ids[1]);
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@ use crate::{
|
||||||
common::server_fixture::{ServerFixture, ServerType},
|
common::server_fixture::{ServerFixture, ServerType},
|
||||||
end_to_end_cases::scenario::{
|
end_to_end_cases::scenario::{
|
||||||
fixture_broken_catalog, fixture_replay_broken, list_chunks, wait_for_exact_chunk_states,
|
fixture_broken_catalog, fixture_replay_broken, list_chunks, wait_for_exact_chunk_states,
|
||||||
DatabaseBuilder,
|
wait_for_operations_to_complete, DatabaseBuilder,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use assert_cmd::Command;
|
use assert_cmd::Command;
|
||||||
|
@ -1547,3 +1547,138 @@ async fn test_persist_partition_error() {
|
||||||
"Cannot persist partition because it cannot be flushed at the moment",
|
"Cannot persist partition because it cannot be flushed at the moment",
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_compact_os_partition() {
|
||||||
|
// Make 2 persisted chunks for a partition
|
||||||
|
let (fixture, db_name, addr, _chunk_ids) = setup_load_and_persist_two_partition_chunks().await;
|
||||||
|
|
||||||
|
// Compact the partition which will compact those 2 chunks
|
||||||
|
let iox_operation: IoxOperation = serde_json::from_slice(
|
||||||
|
&Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("database")
|
||||||
|
.arg("partition")
|
||||||
|
.arg("compact-object-store-partition")
|
||||||
|
.arg(&db_name)
|
||||||
|
.arg("cpu") // partition key
|
||||||
|
.arg("cpu") // table name
|
||||||
|
//.arg(chunk_ids)
|
||||||
|
.arg("--host")
|
||||||
|
.arg(addr)
|
||||||
|
.assert()
|
||||||
|
.success()
|
||||||
|
.get_output()
|
||||||
|
.stdout,
|
||||||
|
)
|
||||||
|
.expect("Expected JSON output");
|
||||||
|
|
||||||
|
// Ensure we got a legit job description back
|
||||||
|
match iox_operation.metadata.job {
|
||||||
|
Some(Job::CompactObjectStoreChunks(job)) => {
|
||||||
|
assert_eq!(job.chunks.len(), 2);
|
||||||
|
assert_eq!(&job.db_name, &db_name);
|
||||||
|
assert_eq!(job.partition_key.as_str(), "cpu");
|
||||||
|
assert_eq!(job.table_name.as_str(), "cpu");
|
||||||
|
}
|
||||||
|
job => panic!("unexpected job returned {:#?}", job),
|
||||||
|
}
|
||||||
|
// Wait for the compaction to complete
|
||||||
|
wait_for_operations_to_complete(&fixture, &db_name, Duration::from_secs(5)).await;
|
||||||
|
|
||||||
|
// Verify chunk the DB now only has one OS-only chunk
|
||||||
|
let chunks = list_chunks(&fixture, &db_name).await;
|
||||||
|
assert_eq!(chunks.len(), 1);
|
||||||
|
assert_eq!(chunks[0].storage, ChunkStorage::ObjectStoreOnly);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_compact_os_chunks() {
|
||||||
|
// Make 2 persisted chunks for a partition
|
||||||
|
let (fixture, db_name, addr, chunk_ids) = setup_load_and_persist_two_partition_chunks().await;
|
||||||
|
|
||||||
|
// Compact the partition which will compact those 2 chunks
|
||||||
|
let iox_operation: IoxOperation = serde_json::from_slice(
|
||||||
|
&Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("database")
|
||||||
|
.arg("partition")
|
||||||
|
.arg("compact-object-store-chunks")
|
||||||
|
.arg(&db_name)
|
||||||
|
.arg("cpu") // partition key
|
||||||
|
.arg("cpu") // table name
|
||||||
|
.arg(chunk_ids[0].clone())
|
||||||
|
.arg(chunk_ids[1].clone())
|
||||||
|
.arg("--host")
|
||||||
|
.arg(addr)
|
||||||
|
.assert()
|
||||||
|
.success()
|
||||||
|
.get_output()
|
||||||
|
.stdout,
|
||||||
|
)
|
||||||
|
.expect("Expected JSON output");
|
||||||
|
|
||||||
|
// Ensure we got a legit job description back
|
||||||
|
match iox_operation.metadata.job {
|
||||||
|
Some(Job::CompactObjectStoreChunks(job)) => {
|
||||||
|
assert_eq!(job.chunks.len(), 2);
|
||||||
|
assert_eq!(&job.db_name, &db_name);
|
||||||
|
assert_eq!(job.partition_key.as_str(), "cpu");
|
||||||
|
assert_eq!(job.table_name.as_str(), "cpu");
|
||||||
|
}
|
||||||
|
job => panic!("unexpected job returned {:#?}", job),
|
||||||
|
}
|
||||||
|
// Wait for the compaction to complete
|
||||||
|
wait_for_operations_to_complete(&fixture, &db_name, Duration::from_secs(5)).await;
|
||||||
|
|
||||||
|
// Verify chunk the DB now only has one OS-only chunk
|
||||||
|
let chunks = list_chunks(&fixture, &db_name).await;
|
||||||
|
assert_eq!(chunks.len(), 1);
|
||||||
|
assert_eq!(chunks[0].storage, ChunkStorage::ObjectStoreOnly);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn setup_load_and_persist_two_partition_chunks(
|
||||||
|
) -> (Arc<ServerFixture>, String, String, Vec<String>) {
|
||||||
|
let fixture = Arc::from(ServerFixture::create_shared(ServerType::Database).await);
|
||||||
|
let addr = fixture.grpc_base();
|
||||||
|
let db_name = rand_name();
|
||||||
|
|
||||||
|
DatabaseBuilder::new(db_name.clone())
|
||||||
|
.persist(true)
|
||||||
|
.persist_age_threshold_seconds(1)
|
||||||
|
.late_arrive_window_seconds(1)
|
||||||
|
.build(fixture.grpc_channel())
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Load first chunk and wait for it to get persisted
|
||||||
|
let lp_data = vec!["cpu,region=west user=23.2 10"];
|
||||||
|
load_lp(addr, &db_name, lp_data);
|
||||||
|
|
||||||
|
wait_for_exact_chunk_states(
|
||||||
|
&fixture,
|
||||||
|
&db_name,
|
||||||
|
vec![ChunkStorage::ReadBufferAndObjectStore],
|
||||||
|
std::time::Duration::from_secs(10),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Load second chunk and wait for it to get persisted, too
|
||||||
|
let lp_data = vec!["cpu,region=east user=79 30"];
|
||||||
|
load_lp(addr, &db_name, lp_data);
|
||||||
|
|
||||||
|
let chunks = wait_for_exact_chunk_states(
|
||||||
|
&fixture,
|
||||||
|
&db_name,
|
||||||
|
vec![
|
||||||
|
ChunkStorage::ReadBufferAndObjectStore,
|
||||||
|
ChunkStorage::ReadBufferAndObjectStore,
|
||||||
|
],
|
||||||
|
std::time::Duration::from_secs(10),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// collect chunk ids
|
||||||
|
let chunk_ids: Vec<_> = chunks.iter().map(|c| c.id.get().to_string()).collect();
|
||||||
|
|
||||||
|
(Arc::clone(&fixture), db_name, String::from(addr), chunk_ids)
|
||||||
|
}
|
||||||
|
|
|
@ -8,6 +8,10 @@ mod flight_api;
|
||||||
mod freeze;
|
mod freeze;
|
||||||
mod http;
|
mod http;
|
||||||
mod influxdb_ioxd;
|
mod influxdb_ioxd;
|
||||||
|
|
||||||
|
#[cfg(feature = "kafka")]
|
||||||
|
mod kafka;
|
||||||
|
|
||||||
mod management_api;
|
mod management_api;
|
||||||
mod management_cli;
|
mod management_cli;
|
||||||
mod metrics;
|
mod metrics;
|
||||||
|
|
|
@ -10,7 +10,7 @@ use crate::{
|
||||||
end_to_end_cases::scenario::{list_chunks, wait_for_exact_chunk_states},
|
end_to_end_cases::scenario::{list_chunks, wait_for_exact_chunk_states},
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::scenario::{collect_query, create_readable_database, rand_name, DatabaseBuilder};
|
use super::scenario::{create_readable_database, rand_name, DatabaseBuilder};
|
||||||
use crate::common::server_fixture::DEFAULT_SERVER_ID;
|
use crate::common::server_fixture::DEFAULT_SERVER_ID;
|
||||||
use generated_types::influxdata::iox::management::v1::{operation_metadata::Job, CompactChunks};
|
use generated_types::influxdata::iox::management::v1::{operation_metadata::Job, CompactChunks};
|
||||||
|
|
||||||
|
@ -310,9 +310,14 @@ async fn assert_chunk_query_works(fixture: &ServerFixture, db_name: &str) {
|
||||||
let mut client = fixture.flight_client();
|
let mut client = fixture.flight_client();
|
||||||
let sql_query = "select region, user, time from cpu";
|
let sql_query = "select region, user, time from cpu";
|
||||||
|
|
||||||
let query_results = client.perform_query(db_name, sql_query).await.unwrap();
|
let batches = client
|
||||||
|
.perform_query(db_name, sql_query)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.collect()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
let batches = collect_query(query_results).await;
|
|
||||||
let expected_read_data = vec![
|
let expected_read_data = vec![
|
||||||
"+--------+------+--------------------------------+",
|
"+--------+------+--------------------------------+",
|
||||||
"| region | user | time |",
|
"| region | user | time |",
|
||||||
|
|
|
@ -15,7 +15,6 @@ use generated_types::{
|
||||||
};
|
};
|
||||||
use influxdb_iox_client::{
|
use influxdb_iox_client::{
|
||||||
connection::Connection,
|
connection::Connection,
|
||||||
flight::PerformQuery,
|
|
||||||
management::{
|
management::{
|
||||||
self,
|
self,
|
||||||
generated_types::{partition_template, WriteBufferConnection},
|
generated_types::{partition_template, WriteBufferConnection},
|
||||||
|
@ -468,15 +467,6 @@ pub async fn create_two_partition_database(db_name: impl Into<String>, channel:
|
||||||
.expect("write succeded");
|
.expect("write succeded");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Collect the results of a query into a vector of record batches
|
|
||||||
pub async fn collect_query(mut query_results: PerformQuery) -> Vec<RecordBatch> {
|
|
||||||
let mut batches = vec![];
|
|
||||||
while let Some(data) = query_results.next().await.unwrap() {
|
|
||||||
batches.push(data);
|
|
||||||
}
|
|
||||||
batches
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Wait for the chunks to be in exactly `desired_storages` states
|
/// Wait for the chunks to be in exactly `desired_storages` states
|
||||||
pub async fn wait_for_exact_chunk_states(
|
pub async fn wait_for_exact_chunk_states(
|
||||||
fixture: &ServerFixture,
|
fixture: &ServerFixture,
|
||||||
|
|
|
@ -4,7 +4,7 @@ use crate::{
|
||||||
};
|
};
|
||||||
use arrow_util::{assert_batches_eq, test_util::normalize_batches};
|
use arrow_util::{assert_batches_eq, test_util::normalize_batches};
|
||||||
|
|
||||||
use super::scenario::{collect_query, create_readable_database, list_chunks, rand_name};
|
use super::scenario::{create_readable_database, list_chunks, rand_name};
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_operations() {
|
async fn test_operations() {
|
||||||
|
@ -47,9 +47,13 @@ async fn test_operations() {
|
||||||
let mut client = fixture.flight_client();
|
let mut client = fixture.flight_client();
|
||||||
let sql_query = "select status, description from system.operations";
|
let sql_query = "select status, description from system.operations";
|
||||||
|
|
||||||
let query_results = client.perform_query(&db_name1, sql_query).await.unwrap();
|
let batches = client
|
||||||
|
.perform_query(&db_name1, sql_query)
|
||||||
let batches = collect_query(query_results).await;
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.collect()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
// parameterize on db_name1
|
// parameterize on db_name1
|
||||||
|
|
||||||
|
@ -64,9 +68,14 @@ async fn test_operations() {
|
||||||
assert_batches_eq!(expected_read_data, &batches);
|
assert_batches_eq!(expected_read_data, &batches);
|
||||||
|
|
||||||
// Should not see jobs from db1 when querying db2
|
// Should not see jobs from db1 when querying db2
|
||||||
let query_results = client.perform_query(&db_name2, sql_query).await.unwrap();
|
let batches = client
|
||||||
|
.perform_query(&db_name2, sql_query)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.collect()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
let batches = collect_query(query_results).await;
|
|
||||||
let expected_read_data = vec![
|
let expected_read_data = vec![
|
||||||
"+--------+-------------+",
|
"+--------+-------------+",
|
||||||
"| status | description |",
|
"| status | description |",
|
||||||
|
@ -109,13 +118,14 @@ async fn test_queries() {
|
||||||
let query = "select query_type, query_text from system.queries";
|
let query = "select query_type, query_text from system.queries";
|
||||||
|
|
||||||
// Query system.queries and should have an entry for the storage rpc
|
// Query system.queries and should have an entry for the storage rpc
|
||||||
let query_results = fixture
|
let batches = fixture
|
||||||
.flight_client()
|
.flight_client()
|
||||||
.perform_query(&db_name, query)
|
.perform_query(&db_name, query)
|
||||||
.await
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.collect()
|
||||||
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let batches = collect_query(query_results).await;
|
|
||||||
let batches = normalize_batches(batches, scenario.normalizer());
|
let batches = normalize_batches(batches, scenario.normalizer());
|
||||||
|
|
||||||
let expected_read_data = vec![
|
let expected_read_data = vec![
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use super::scenario::{collect_query, Scenario};
|
use super::scenario::Scenario;
|
||||||
use crate::common::{
|
use crate::common::{
|
||||||
server_fixture::{ServerFixture, ServerType, TestConfig},
|
server_fixture::{ServerFixture, ServerType, TestConfig},
|
||||||
udp_listener::UdpCapture,
|
udp_listener::UdpCapture,
|
||||||
|
@ -33,6 +33,7 @@ async fn setup() -> (UdpCapture, ServerFixture) {
|
||||||
(udp_capture, server_fixture)
|
(udp_capture, server_fixture)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Runs a query, discarding the results
|
||||||
async fn run_sql_query(server_fixture: &ServerFixture) {
|
async fn run_sql_query(server_fixture: &ServerFixture) {
|
||||||
let scenario = Scenario::new();
|
let scenario = Scenario::new();
|
||||||
scenario
|
scenario
|
||||||
|
@ -44,12 +45,13 @@ async fn run_sql_query(server_fixture: &ServerFixture) {
|
||||||
let sql_query = "select * from cpu_load_short";
|
let sql_query = "select * from cpu_load_short";
|
||||||
let mut client = server_fixture.flight_client();
|
let mut client = server_fixture.flight_client();
|
||||||
|
|
||||||
let query_results = client
|
client
|
||||||
.perform_query(scenario.database_name(), sql_query)
|
.perform_query(scenario.database_name(), sql_query)
|
||||||
.await
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.collect()
|
||||||
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
collect_query(query_results).await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|
|
@ -218,8 +218,8 @@ impl PerformQuery {
|
||||||
)?))
|
)?))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return all record batches of it
|
/// Collect and return all `RecordBatch`es into a `Vec`
|
||||||
pub async fn to_batches(&mut self) -> Result<Vec<RecordBatch>, Error> {
|
pub async fn collect(&mut self) -> Result<Vec<RecordBatch>, Error> {
|
||||||
let mut batches = Vec::new();
|
let mut batches = Vec::new();
|
||||||
while let Some(data) = self.next().await? {
|
while let Some(data) = self.next().await? {
|
||||||
batches.push(data);
|
batches.push(data);
|
||||||
|
|
|
@ -535,4 +535,31 @@ impl Client {
|
||||||
.unwrap_field("operation")?
|
.unwrap_field("operation")?
|
||||||
.try_into()?)
|
.try_into()?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compact all object store of a give partition
|
||||||
|
pub async fn compact_object_store_partition(
|
||||||
|
&mut self,
|
||||||
|
db_name: impl Into<String> + Send,
|
||||||
|
table_name: impl Into<String> + Send,
|
||||||
|
partition_key: impl Into<String> + Send,
|
||||||
|
) -> Result<IoxOperation, Error> {
|
||||||
|
let db_name = db_name.into();
|
||||||
|
let partition_key = partition_key.into();
|
||||||
|
let table_name = table_name.into();
|
||||||
|
|
||||||
|
let response = self
|
||||||
|
.inner
|
||||||
|
.compact_object_store_partition(CompactObjectStorePartitionRequest {
|
||||||
|
db_name,
|
||||||
|
partition_key,
|
||||||
|
table_name,
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(response
|
||||||
|
.into_inner()
|
||||||
|
.operation
|
||||||
|
.unwrap_field("operation")?
|
||||||
|
.try_into()?)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,10 @@ chrono = { version = "0.4", default-features = false, features = ["clock"] }
|
||||||
# Google Cloud Storage integration
|
# Google Cloud Storage integration
|
||||||
cloud-storage = {version = "0.10.3", optional = true}
|
cloud-storage = {version = "0.10.3", optional = true}
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
|
# for rusoto
|
||||||
|
hyper = { version = "0.14", optional = true, default-features = false }
|
||||||
|
# for rusoto
|
||||||
|
hyper-tls = { version = "0.5.0", optional = true, default-features = false }
|
||||||
indexmap = { version = "1.7", optional = true, features = ["std"] }
|
indexmap = { version = "1.7", optional = true, features = ["std"] }
|
||||||
itertools = "0.10.1"
|
itertools = "0.10.1"
|
||||||
observability_deps = { path = "../observability_deps" }
|
observability_deps = { path = "../observability_deps" }
|
||||||
|
@ -36,7 +40,7 @@ workspace-hack = { path = "../workspace-hack"}
|
||||||
[features]
|
[features]
|
||||||
azure = ["azure_core", "azure_storage", "indexmap", "reqwest"]
|
azure = ["azure_core", "azure_storage", "indexmap", "reqwest"]
|
||||||
gcp = ["cloud-storage"]
|
gcp = ["cloud-storage"]
|
||||||
aws = ["rusoto_core", "rusoto_credential", "rusoto_s3"]
|
aws = ["rusoto_core", "rusoto_credential", "rusoto_s3", "hyper", "hyper-tls"]
|
||||||
|
|
||||||
[dev-dependencies] # In alphabetical order
|
[dev-dependencies] # In alphabetical order
|
||||||
dotenv = "0.15.0"
|
dotenv = "0.15.0"
|
||||||
|
|
|
@ -11,12 +11,15 @@ use futures::{
|
||||||
stream::{self, BoxStream},
|
stream::{self, BoxStream},
|
||||||
Future, StreamExt, TryStreamExt,
|
Future, StreamExt, TryStreamExt,
|
||||||
};
|
};
|
||||||
|
use hyper::client::Builder as HyperBuilder;
|
||||||
|
use hyper_tls::HttpsConnector;
|
||||||
use observability_deps::tracing::{debug, warn};
|
use observability_deps::tracing::{debug, warn};
|
||||||
use rusoto_core::ByteStream;
|
use rusoto_core::ByteStream;
|
||||||
use rusoto_credential::{InstanceMetadataProvider, StaticProvider};
|
use rusoto_credential::{InstanceMetadataProvider, StaticProvider};
|
||||||
use rusoto_s3::S3;
|
use rusoto_s3::S3;
|
||||||
use snafu::{OptionExt, ResultExt, Snafu};
|
use snafu::{OptionExt, ResultExt, Snafu};
|
||||||
use std::{convert::TryFrom, fmt, time::Duration};
|
use std::{convert::TryFrom, fmt, num::NonZeroUsize, ops::Deref, sync::Arc, time::Duration};
|
||||||
|
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||||
|
|
||||||
/// A specialized `Result` for object store-related errors
|
/// A specialized `Result` for object store-related errors
|
||||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
|
@ -140,7 +143,15 @@ pub enum Error {
|
||||||
|
|
||||||
/// Configuration for connecting to [Amazon S3](https://aws.amazon.com/s3/).
|
/// Configuration for connecting to [Amazon S3](https://aws.amazon.com/s3/).
|
||||||
pub struct AmazonS3 {
|
pub struct AmazonS3 {
|
||||||
client: rusoto_s3::S3Client,
|
/// S3 client w/o any connection limit.
|
||||||
|
///
|
||||||
|
/// You should normally use [`Self::client`] instead.
|
||||||
|
client_unrestricted: rusoto_s3::S3Client,
|
||||||
|
|
||||||
|
/// Semaphore that limits the usage of [`client_unrestricted`](Self::client_unrestricted).
|
||||||
|
connection_semaphore: Arc<Semaphore>,
|
||||||
|
|
||||||
|
/// Bucket name used by this object store client.
|
||||||
bucket_name: String,
|
bucket_name: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,7 +196,7 @@ impl ObjectStoreApi for AmazonS3 {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let s3 = self.client.clone();
|
let s3 = self.client().await;
|
||||||
|
|
||||||
s3_request(move || {
|
s3_request(move || {
|
||||||
let (s3, request_factory) = (s3.clone(), request_factory.clone());
|
let (s3, request_factory) = (s3.clone(), request_factory.clone());
|
||||||
|
@ -210,7 +221,8 @@ impl ObjectStoreApi for AmazonS3 {
|
||||||
};
|
};
|
||||||
let bucket_name = self.bucket_name.clone();
|
let bucket_name = self.bucket_name.clone();
|
||||||
let s = self
|
let s = self
|
||||||
.client
|
.client()
|
||||||
|
.await
|
||||||
.get_object(get_request)
|
.get_object(get_request)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| match e {
|
.map_err(|e| match e {
|
||||||
|
@ -252,7 +264,7 @@ impl ObjectStoreApi for AmazonS3 {
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
let s3 = self.client.clone();
|
let s3 = self.client().await;
|
||||||
|
|
||||||
s3_request(move || {
|
s3_request(move || {
|
||||||
let (s3, request_factory) = (s3.clone(), request_factory.clone());
|
let (s3, request_factory) = (s3.clone(), request_factory.clone());
|
||||||
|
@ -357,6 +369,7 @@ pub(crate) fn new_s3(
|
||||||
bucket_name: impl Into<String>,
|
bucket_name: impl Into<String>,
|
||||||
endpoint: Option<impl Into<String>>,
|
endpoint: Option<impl Into<String>>,
|
||||||
session_token: Option<impl Into<String>>,
|
session_token: Option<impl Into<String>>,
|
||||||
|
max_connections: NonZeroUsize,
|
||||||
) -> Result<AmazonS3> {
|
) -> Result<AmazonS3> {
|
||||||
let region = region.into();
|
let region = region.into();
|
||||||
let region: rusoto_core::Region = match endpoint {
|
let region: rusoto_core::Region = match endpoint {
|
||||||
|
@ -367,8 +380,10 @@ pub(crate) fn new_s3(
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
let http_client = rusoto_core::request::HttpClient::new()
|
let mut builder = HyperBuilder::default();
|
||||||
.expect("Current implementation of rusoto_core has no way for this to fail");
|
builder.pool_max_idle_per_host(max_connections.get());
|
||||||
|
let connector = HttpsConnector::new();
|
||||||
|
let http_client = rusoto_core::request::HttpClient::from_builder(builder, connector);
|
||||||
|
|
||||||
let client = match (access_key_id, secret_access_key, session_token) {
|
let client = match (access_key_id, secret_access_key, session_token) {
|
||||||
(Some(access_key_id), Some(secret_access_key), Some(session_token)) => {
|
(Some(access_key_id), Some(secret_access_key), Some(session_token)) => {
|
||||||
|
@ -394,7 +409,8 @@ pub(crate) fn new_s3(
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(AmazonS3 {
|
Ok(AmazonS3 {
|
||||||
client,
|
client_unrestricted: client,
|
||||||
|
connection_semaphore: Arc::new(Semaphore::new(max_connections.get())),
|
||||||
bucket_name: bucket_name.into(),
|
bucket_name: bucket_name.into(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -407,10 +423,43 @@ pub(crate) fn new_failing_s3() -> Result<AmazonS3> {
|
||||||
"bucket",
|
"bucket",
|
||||||
None as Option<&str>,
|
None as Option<&str>,
|
||||||
None as Option<&str>,
|
None as Option<&str>,
|
||||||
|
NonZeroUsize::new(16).unwrap(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// S3 client bundled w/ a semaphore permit.
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct SemaphoreClient {
|
||||||
|
/// Permit for this specific use of the client.
|
||||||
|
///
|
||||||
|
/// Note that this field is never read and therefore considered "dead code" by rustc.
|
||||||
|
#[allow(dead_code)]
|
||||||
|
permit: Arc<OwnedSemaphorePermit>,
|
||||||
|
|
||||||
|
inner: rusoto_s3::S3Client,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Deref for SemaphoreClient {
|
||||||
|
type Target = rusoto_s3::S3Client;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.inner
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl AmazonS3 {
|
impl AmazonS3 {
|
||||||
|
/// Get a client according to the current connection limit.
|
||||||
|
async fn client(&self) -> SemaphoreClient {
|
||||||
|
let permit = Arc::clone(&self.connection_semaphore)
|
||||||
|
.acquire_owned()
|
||||||
|
.await
|
||||||
|
.expect("semaphore shouldn't be closed yet");
|
||||||
|
SemaphoreClient {
|
||||||
|
permit: Arc::new(permit),
|
||||||
|
inner: self.client_unrestricted.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn list_objects_v2(
|
async fn list_objects_v2(
|
||||||
&self,
|
&self,
|
||||||
prefix: Option<&CloudPath>,
|
prefix: Option<&CloudPath>,
|
||||||
|
@ -433,10 +482,11 @@ impl AmazonS3 {
|
||||||
delimiter,
|
delimiter,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
let s3 = self.client().await;
|
||||||
|
|
||||||
Ok(stream::unfold(ListState::Start, move |state| {
|
Ok(stream::unfold(ListState::Start, move |state| {
|
||||||
let request_factory = request_factory.clone();
|
let request_factory = request_factory.clone();
|
||||||
let s3 = self.client.clone();
|
let s3 = s3.clone();
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
let continuation_token = match state.clone() {
|
let continuation_token = match state.clone() {
|
||||||
|
@ -685,6 +735,7 @@ mod tests {
|
||||||
config.bucket,
|
config.bucket,
|
||||||
config.endpoint,
|
config.endpoint,
|
||||||
config.token,
|
config.token,
|
||||||
|
NonZeroUsize::new(16).unwrap(),
|
||||||
)
|
)
|
||||||
.expect("Valid S3 config");
|
.expect("Valid S3 config");
|
||||||
|
|
||||||
|
@ -705,6 +756,7 @@ mod tests {
|
||||||
&config.bucket,
|
&config.bucket,
|
||||||
config.endpoint,
|
config.endpoint,
|
||||||
config.token,
|
config.token,
|
||||||
|
NonZeroUsize::new(16).unwrap(),
|
||||||
)
|
)
|
||||||
.expect("Valid S3 config");
|
.expect("Valid S3 config");
|
||||||
|
|
||||||
|
@ -735,6 +787,7 @@ mod tests {
|
||||||
&config.bucket,
|
&config.bucket,
|
||||||
config.endpoint,
|
config.endpoint,
|
||||||
config.token,
|
config.token,
|
||||||
|
NonZeroUsize::new(16).unwrap(),
|
||||||
)
|
)
|
||||||
.expect("Valid S3 config");
|
.expect("Valid S3 config");
|
||||||
|
|
||||||
|
@ -776,6 +829,7 @@ mod tests {
|
||||||
&config.bucket,
|
&config.bucket,
|
||||||
config.endpoint,
|
config.endpoint,
|
||||||
config.token,
|
config.token,
|
||||||
|
NonZeroUsize::new(16).unwrap(),
|
||||||
)
|
)
|
||||||
.expect("Valid S3 config");
|
.expect("Valid S3 config");
|
||||||
|
|
||||||
|
@ -812,6 +866,7 @@ mod tests {
|
||||||
&config.bucket,
|
&config.bucket,
|
||||||
config.endpoint,
|
config.endpoint,
|
||||||
config.token,
|
config.token,
|
||||||
|
NonZeroUsize::new(16).unwrap(),
|
||||||
)
|
)
|
||||||
.expect("Valid S3 config");
|
.expect("Valid S3 config");
|
||||||
|
|
||||||
|
@ -850,6 +905,7 @@ mod tests {
|
||||||
&config.bucket,
|
&config.bucket,
|
||||||
config.endpoint,
|
config.endpoint,
|
||||||
config.token,
|
config.token,
|
||||||
|
NonZeroUsize::new(16).unwrap(),
|
||||||
)
|
)
|
||||||
.expect("Valid S3 config");
|
.expect("Valid S3 config");
|
||||||
|
|
||||||
|
@ -886,6 +942,7 @@ mod tests {
|
||||||
config.bucket,
|
config.bucket,
|
||||||
config.endpoint,
|
config.endpoint,
|
||||||
config.token,
|
config.token,
|
||||||
|
NonZeroUsize::new(16).unwrap(),
|
||||||
)
|
)
|
||||||
.expect("Valid S3 config");
|
.expect("Valid S3 config");
|
||||||
|
|
||||||
|
@ -910,6 +967,7 @@ mod tests {
|
||||||
&config.bucket,
|
&config.bucket,
|
||||||
config.endpoint,
|
config.endpoint,
|
||||||
config.token,
|
config.token,
|
||||||
|
NonZeroUsize::new(16).unwrap(),
|
||||||
)
|
)
|
||||||
.expect("Valid S3 config");
|
.expect("Valid S3 config");
|
||||||
|
|
||||||
|
@ -946,6 +1004,7 @@ mod tests {
|
||||||
&config.bucket,
|
&config.bucket,
|
||||||
config.endpoint,
|
config.endpoint,
|
||||||
config.token,
|
config.token,
|
||||||
|
NonZeroUsize::new(16).unwrap(),
|
||||||
)
|
)
|
||||||
.expect("Valid S3 config");
|
.expect("Valid S3 config");
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
//! Crate that mimics the interface of the the various object stores
|
//! Crate that mimics the interface of the the various object stores
|
||||||
//! but does nothing if they are not enabled.
|
//! but does nothing if they are not enabled.
|
||||||
|
use std::num::NonZeroUsize;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use snafu::Snafu;
|
use snafu::Snafu;
|
||||||
|
@ -89,6 +91,7 @@ pub(crate) fn new_s3(
|
||||||
_bucket_name: impl Into<String>,
|
_bucket_name: impl Into<String>,
|
||||||
_endpoint: Option<impl Into<String>>,
|
_endpoint: Option<impl Into<String>>,
|
||||||
_session_token: Option<impl Into<String>>,
|
_session_token: Option<impl Into<String>>,
|
||||||
|
_max_connections: NonZeroUsize,
|
||||||
) -> Result<DummyObjectStore> {
|
) -> Result<DummyObjectStore> {
|
||||||
NotSupported { name: "aws" }.fail()
|
NotSupported { name: "aws" }.fail()
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,7 +59,7 @@ use bytes::Bytes;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use futures::{stream::BoxStream, StreamExt, TryFutureExt, TryStreamExt};
|
use futures::{stream::BoxStream, StreamExt, TryFutureExt, TryStreamExt};
|
||||||
use snafu::{ResultExt, Snafu};
|
use snafu::{ResultExt, Snafu};
|
||||||
use std::fmt::Formatter;
|
use std::{fmt::Formatter, num::NonZeroUsize};
|
||||||
use std::{path::PathBuf, sync::Arc};
|
use std::{path::PathBuf, sync::Arc};
|
||||||
|
|
||||||
/// Universal API to multiple object store services.
|
/// Universal API to multiple object store services.
|
||||||
|
@ -118,6 +118,7 @@ impl ObjectStore {
|
||||||
bucket_name: impl Into<String>,
|
bucket_name: impl Into<String>,
|
||||||
endpoint: Option<impl Into<String>>,
|
endpoint: Option<impl Into<String>>,
|
||||||
session_token: Option<impl Into<String>>,
|
session_token: Option<impl Into<String>>,
|
||||||
|
max_connections: NonZeroUsize,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let s3 = aws::new_s3(
|
let s3 = aws::new_s3(
|
||||||
access_key_id,
|
access_key_id,
|
||||||
|
@ -126,6 +127,7 @@ impl ObjectStore {
|
||||||
bucket_name,
|
bucket_name,
|
||||||
endpoint,
|
endpoint,
|
||||||
session_token,
|
session_token,
|
||||||
|
max_connections,
|
||||||
)?;
|
)?;
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
integration: ObjectStoreIntegration::AmazonS3(s3),
|
integration: ObjectStoreIntegration::AmazonS3(s3),
|
||||||
|
|
|
@ -397,7 +397,7 @@ def cargo_build_iox(debug=False, build_with_aws=True):
|
||||||
t = time.time()
|
t = time.time()
|
||||||
print('building IOx')
|
print('building IOx')
|
||||||
|
|
||||||
features = []
|
features = ['kafka']
|
||||||
if build_with_aws:
|
if build_with_aws:
|
||||||
features.append('aws')
|
features.append('aws')
|
||||||
features = ','.join(features)
|
features = ','.join(features)
|
||||||
|
|
|
@ -25,3 +25,6 @@ workspace-hack = { path = "../workspace-hack"}
|
||||||
mutable_batch_lp = { path = "../mutable_batch_lp" }
|
mutable_batch_lp = { path = "../mutable_batch_lp" }
|
||||||
regex = "1"
|
regex = "1"
|
||||||
tokio = { version = "1.13", features = ["macros", "parking_lot"] }
|
tokio = { version = "1.13", features = ["macros", "parking_lot"] }
|
||||||
|
|
||||||
|
[features]
|
||||||
|
kafka = ["write_buffer/kafka"]
|
||||||
|
|
|
@ -65,5 +65,8 @@ test_helpers = { path = "../test_helpers" }
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
||||||
|
|
||||||
# Enable features for benchmarking
|
# Enable features for benchmarking
|
||||||
bench = ["mutable_buffer/nocache"]
|
bench = ["mutable_buffer/nocache"]
|
||||||
|
|
||||||
|
kafka = ["write_buffer/kafka"]
|
||||||
|
|
|
@ -675,6 +675,37 @@ impl Db {
|
||||||
fut.await.context(TaskCancelled)?.context(LifecycleError)
|
fut.await.context(TaskCancelled)?.context(LifecycleError)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compact all persisted chunks in this partition
|
||||||
|
/// Return error if the persisted chunks are not contiguous. This means
|
||||||
|
/// there are chunks in between those OS chunks are not yet persisted
|
||||||
|
pub fn compact_object_store_partition(
|
||||||
|
self: &Arc<Self>,
|
||||||
|
table_name: &str,
|
||||||
|
partition_key: &str,
|
||||||
|
) -> Result<TaskTracker<Job>> {
|
||||||
|
// acquire partition read lock to get OS chunk ids
|
||||||
|
let partition = self.lockable_partition(table_name, partition_key)?;
|
||||||
|
let partition = partition.read();
|
||||||
|
let chunks = partition.chunks();
|
||||||
|
|
||||||
|
// Get all OS chunk IDs
|
||||||
|
let mut chunk_ids = vec![];
|
||||||
|
for chunk in chunks {
|
||||||
|
let chunk = chunk.read();
|
||||||
|
if chunk.is_persisted() {
|
||||||
|
chunk_ids.push(chunk.id());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// drop partition lock
|
||||||
|
partition.into_data();
|
||||||
|
|
||||||
|
// Compact all the OS chunks
|
||||||
|
// Error will return if those OS chunks are not contiguous which means
|
||||||
|
// a chunk in between those OS chunks are not yet persisted
|
||||||
|
self.compact_object_store_chunks(table_name, partition_key, chunk_ids)
|
||||||
|
}
|
||||||
|
|
||||||
/// Compact all provided persisted chunks
|
/// Compact all provided persisted chunks
|
||||||
pub fn compact_object_store_chunks(
|
pub fn compact_object_store_chunks(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
|
@ -2135,7 +2166,6 @@ mod tests {
|
||||||
load_parquet_from_store_for_path(&path_list[0], Arc::clone(&db.iox_object_store))
|
load_parquet_from_store_for_path(&path_list[0], Arc::clone(&db.iox_object_store))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone())
|
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone())
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
|
@ -690,7 +690,8 @@ mod tests {
|
||||||
let partition = partition.upgrade();
|
let partition = partition.upgrade();
|
||||||
let chunk1 = chunks[0].write();
|
let chunk1 = chunks[0].write();
|
||||||
let chunk2 = chunks[1].write();
|
let chunk2 = chunks[1].write();
|
||||||
let _compacted_chunk = compact_object_store_chunks(partition, vec![chunk1, chunk2])
|
// Provide the chunk ids in reverse contiguous order to see if we handle it well
|
||||||
|
let _compacted_chunk = compact_object_store_chunks(partition, vec![chunk2, chunk1])
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.1
|
.1
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -470,6 +470,15 @@ impl Server {
|
||||||
let mut state = self.shared.state.write();
|
let mut state = self.shared.state.write();
|
||||||
let startup = match &**state {
|
let startup = match &**state {
|
||||||
ServerState::Startup(startup) => startup.clone(),
|
ServerState::Startup(startup) => startup.clone(),
|
||||||
|
state
|
||||||
|
if state
|
||||||
|
.server_id()
|
||||||
|
.map(|existing| existing == server_id)
|
||||||
|
.unwrap_or_default() =>
|
||||||
|
{
|
||||||
|
// already set to same ID
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
_ => return Err(Error::IdAlreadySet),
|
_ => return Err(Error::IdAlreadySet),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -2459,4 +2468,20 @@ mod tests {
|
||||||
])
|
])
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn set_server_id_twice() {
|
||||||
|
test_helpers::maybe_start_logging();
|
||||||
|
let server = make_server(make_application());
|
||||||
|
|
||||||
|
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||||
|
server.wait_for_init().await.unwrap();
|
||||||
|
|
||||||
|
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||||
|
|
||||||
|
assert_error!(
|
||||||
|
server.set_id(ServerId::try_from(2).unwrap()),
|
||||||
|
Error::IdAlreadySet
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@ observability_deps = { path = "../observability_deps" }
|
||||||
parking_lot = "0.11.2"
|
parking_lot = "0.11.2"
|
||||||
pin-project = "1.0"
|
pin-project = "1.0"
|
||||||
prost = "0.8"
|
prost = "0.8"
|
||||||
rdkafka = "0.28.0"
|
rdkafka = { version = "0.28.0", optional = true }
|
||||||
time = { path = "../time" }
|
time = { path = "../time" }
|
||||||
tokio = { version = "1.13", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] }
|
tokio = { version = "1.13", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] }
|
||||||
tokio-util = "0.6.9"
|
tokio-util = "0.6.9"
|
||||||
|
@ -28,6 +28,9 @@ trace_http = { path = "../trace_http" }
|
||||||
uuid = { version = "0.8", features = ["v4"] }
|
uuid = { version = "0.8", features = ["v4"] }
|
||||||
workspace-hack = { path = "../workspace-hack"}
|
workspace-hack = { path = "../workspace-hack"}
|
||||||
|
|
||||||
|
[features]
|
||||||
|
kafka = ["rdkafka"]
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tempfile = "3.1.0"
|
tempfile = "3.1.0"
|
||||||
|
|
||||||
|
|
|
@ -100,11 +100,13 @@ impl IoxHeaders {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Gets the content type
|
/// Gets the content type
|
||||||
|
#[allow(dead_code)] // this function is only used in optionally-compiled kafka code
|
||||||
pub fn content_type(&self) -> ContentType {
|
pub fn content_type(&self) -> ContentType {
|
||||||
self.content_type
|
self.content_type
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Gets the span context if any
|
/// Gets the span context if any
|
||||||
|
#[allow(dead_code)] // this function is only used in optionally-compiled kafka code
|
||||||
pub fn span_context(&self) -> Option<&SpanContext> {
|
pub fn span_context(&self) -> Option<&SpanContext> {
|
||||||
self.span_context.as_ref()
|
self.span_context.as_ref()
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,24 +1,21 @@
|
||||||
|
use crate::{
|
||||||
|
core::{WriteBufferError, WriteBufferReading, WriteBufferWriting},
|
||||||
|
file::{FileBufferConsumer, FileBufferProducer},
|
||||||
|
mock::{
|
||||||
|
MockBufferForReading, MockBufferForReadingThatAlwaysErrors, MockBufferForWriting,
|
||||||
|
MockBufferForWritingThatAlwaysErrors, MockBufferSharedState,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
use data_types::{server_id::ServerId, write_buffer::WriteBufferConnection};
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use std::{
|
use std::{
|
||||||
collections::{btree_map::Entry, BTreeMap},
|
collections::{btree_map::Entry, BTreeMap},
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
use data_types::{server_id::ServerId, write_buffer::WriteBufferConnection};
|
|
||||||
use time::TimeProvider;
|
use time::TimeProvider;
|
||||||
use trace::TraceCollector;
|
use trace::TraceCollector;
|
||||||
|
|
||||||
use crate::{
|
|
||||||
core::{WriteBufferError, WriteBufferReading, WriteBufferWriting},
|
|
||||||
file::{FileBufferConsumer, FileBufferProducer},
|
|
||||||
kafka::{KafkaBufferConsumer, KafkaBufferProducer},
|
|
||||||
mock::{
|
|
||||||
MockBufferForReading, MockBufferForReadingThatAlwaysErrors, MockBufferForWriting,
|
|
||||||
MockBufferForWritingThatAlwaysErrors, MockBufferSharedState,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum WriteBufferConfig {
|
pub enum WriteBufferConfig {
|
||||||
Writing(Arc<dyn WriteBufferWriting>),
|
Writing(Arc<dyn WriteBufferWriting>),
|
||||||
|
@ -37,6 +34,7 @@ enum Mock {
|
||||||
pub struct WriteBufferConfigFactory {
|
pub struct WriteBufferConfigFactory {
|
||||||
mocks: RwLock<BTreeMap<String, Mock>>,
|
mocks: RwLock<BTreeMap<String, Mock>>,
|
||||||
time_provider: Arc<dyn TimeProvider>,
|
time_provider: Arc<dyn TimeProvider>,
|
||||||
|
#[allow(dead_code)] // this field is only used in optionally-compiled kafka code
|
||||||
metric_registry: Arc<metric::Registry>,
|
metric_registry: Arc<metric::Registry>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,18 +106,7 @@ impl WriteBufferConfigFactory {
|
||||||
.await?;
|
.await?;
|
||||||
Arc::new(file_buffer) as _
|
Arc::new(file_buffer) as _
|
||||||
}
|
}
|
||||||
"kafka" => {
|
"kafka" => self.kafka_buffer_producer(db_name, cfg).await?,
|
||||||
let kafka_buffer = KafkaBufferProducer::new(
|
|
||||||
&cfg.connection,
|
|
||||||
db_name,
|
|
||||||
&cfg.connection_config,
|
|
||||||
cfg.creation_config.as_ref(),
|
|
||||||
Arc::clone(&self.time_provider),
|
|
||||||
&self.metric_registry,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
Arc::new(kafka_buffer) as _
|
|
||||||
}
|
|
||||||
"mock" => match self.get_mock(&cfg.connection)? {
|
"mock" => match self.get_mock(&cfg.connection)? {
|
||||||
Mock::Normal(state) => {
|
Mock::Normal(state) => {
|
||||||
let mock_buffer = MockBufferForWriting::new(
|
let mock_buffer = MockBufferForWriting::new(
|
||||||
|
@ -142,6 +129,38 @@ impl WriteBufferConfigFactory {
|
||||||
Ok(writer)
|
Ok(writer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "kafka")]
|
||||||
|
async fn kafka_buffer_producer(
|
||||||
|
&self,
|
||||||
|
db_name: &str,
|
||||||
|
cfg: &WriteBufferConnection,
|
||||||
|
) -> Result<Arc<dyn WriteBufferWriting>, WriteBufferError> {
|
||||||
|
let kafka_buffer = crate::kafka::KafkaBufferProducer::new(
|
||||||
|
&cfg.connection,
|
||||||
|
db_name,
|
||||||
|
&cfg.connection_config,
|
||||||
|
cfg.creation_config.as_ref(),
|
||||||
|
Arc::clone(&self.time_provider),
|
||||||
|
&self.metric_registry,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(Arc::new(kafka_buffer) as _)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "kafka"))]
|
||||||
|
async fn kafka_buffer_producer(
|
||||||
|
&self,
|
||||||
|
_db_name: &str,
|
||||||
|
_cfg: &WriteBufferConnection,
|
||||||
|
) -> Result<Arc<dyn WriteBufferWriting>, WriteBufferError> {
|
||||||
|
Err(String::from(
|
||||||
|
"`WriteBufferWriting` of type `kafka` requested, but Kafka support was not included \
|
||||||
|
in this build by enabling the `kafka` feature",
|
||||||
|
)
|
||||||
|
.into())
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns a new [`WriteBufferReading`] for the provided [`WriteBufferConnection`]
|
/// Returns a new [`WriteBufferReading`] for the provided [`WriteBufferConnection`]
|
||||||
pub async fn new_config_read(
|
pub async fn new_config_read(
|
||||||
&self,
|
&self,
|
||||||
|
@ -163,17 +182,8 @@ impl WriteBufferConfigFactory {
|
||||||
Box::new(file_buffer) as _
|
Box::new(file_buffer) as _
|
||||||
}
|
}
|
||||||
"kafka" => {
|
"kafka" => {
|
||||||
let kafka_buffer = KafkaBufferConsumer::new(
|
self.kafka_buffer_consumer(server_id, db_name, trace_collector, cfg)
|
||||||
&cfg.connection,
|
.await?
|
||||||
server_id,
|
|
||||||
db_name,
|
|
||||||
&cfg.connection_config,
|
|
||||||
cfg.creation_config.as_ref(),
|
|
||||||
trace_collector,
|
|
||||||
&self.metric_registry,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
Box::new(kafka_buffer) as _
|
|
||||||
}
|
}
|
||||||
"mock" => match self.get_mock(&cfg.connection)? {
|
"mock" => match self.get_mock(&cfg.connection)? {
|
||||||
Mock::Normal(state) => {
|
Mock::Normal(state) => {
|
||||||
|
@ -193,21 +203,52 @@ impl WriteBufferConfigFactory {
|
||||||
|
|
||||||
Ok(reader)
|
Ok(reader)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "kafka")]
|
||||||
|
async fn kafka_buffer_consumer(
|
||||||
|
&self,
|
||||||
|
server_id: ServerId,
|
||||||
|
db_name: &str,
|
||||||
|
trace_collector: Option<&Arc<dyn TraceCollector>>,
|
||||||
|
cfg: &WriteBufferConnection,
|
||||||
|
) -> Result<Box<dyn WriteBufferReading>, WriteBufferError> {
|
||||||
|
let kafka_buffer = crate::kafka::KafkaBufferConsumer::new(
|
||||||
|
&cfg.connection,
|
||||||
|
server_id,
|
||||||
|
db_name,
|
||||||
|
&cfg.connection_config,
|
||||||
|
cfg.creation_config.as_ref(),
|
||||||
|
trace_collector,
|
||||||
|
&self.metric_registry,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(Box::new(kafka_buffer) as _)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "kafka"))]
|
||||||
|
async fn kafka_buffer_consumer(
|
||||||
|
&self,
|
||||||
|
_server_id: ServerId,
|
||||||
|
_db_name: &str,
|
||||||
|
_trace_collector: Option<&Arc<dyn TraceCollector>>,
|
||||||
|
_cfg: &WriteBufferConnection,
|
||||||
|
) -> Result<Box<dyn WriteBufferReading>, WriteBufferError> {
|
||||||
|
Err(String::from(
|
||||||
|
"`WriteBufferReading` of type `kafka` requested, but Kafka support was not included \
|
||||||
|
in this build by enabling the `kafka` feature",
|
||||||
|
)
|
||||||
|
.into())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::{convert::TryFrom, num::NonZeroU32};
|
|
||||||
|
|
||||||
use data_types::{write_buffer::WriteBufferCreationConfig, DatabaseName};
|
|
||||||
use tempfile::TempDir;
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
kafka::test_utils::random_kafka_topic, maybe_skip_kafka_integration,
|
|
||||||
mock::MockBufferSharedState,
|
|
||||||
};
|
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::{core::test_utils::random_topic_name, mock::MockBufferSharedState};
|
||||||
|
use data_types::{write_buffer::WriteBufferCreationConfig, DatabaseName};
|
||||||
|
use std::{convert::TryFrom, num::NonZeroU32};
|
||||||
|
use tempfile::TempDir;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_writing_file() {
|
async fn test_writing_file() {
|
||||||
|
@ -248,46 +289,6 @@ mod tests {
|
||||||
assert_eq!(conn.type_name(), "file");
|
assert_eq!(conn.type_name(), "file");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_writing_kafka() {
|
|
||||||
let conn = maybe_skip_kafka_integration!();
|
|
||||||
let factory = factory();
|
|
||||||
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
|
|
||||||
let cfg = WriteBufferConnection {
|
|
||||||
type_: "kafka".to_string(),
|
|
||||||
connection: conn,
|
|
||||||
creation_config: Some(WriteBufferCreationConfig::default()),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
let conn = factory
|
|
||||||
.new_config_write(db_name.as_str(), &cfg)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(conn.type_name(), "kafka");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_reading_kafka() {
|
|
||||||
let conn = maybe_skip_kafka_integration!();
|
|
||||||
let factory = factory();
|
|
||||||
let server_id = ServerId::try_from(1).unwrap();
|
|
||||||
|
|
||||||
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
|
|
||||||
let cfg = WriteBufferConnection {
|
|
||||||
type_: "kafka".to_string(),
|
|
||||||
connection: conn,
|
|
||||||
creation_config: Some(WriteBufferCreationConfig::default()),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
let conn = factory
|
|
||||||
.new_config_read(server_id, db_name.as_str(), None, &cfg)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(conn.type_name(), "kafka");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_writing_mock() {
|
async fn test_writing_mock() {
|
||||||
let factory = factory();
|
let factory = factory();
|
||||||
|
@ -297,7 +298,7 @@ mod tests {
|
||||||
let mock_name = "some_mock";
|
let mock_name = "some_mock";
|
||||||
factory.register_mock(mock_name.to_string(), state);
|
factory.register_mock(mock_name.to_string(), state);
|
||||||
|
|
||||||
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
|
let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
|
||||||
let cfg = WriteBufferConnection {
|
let cfg = WriteBufferConnection {
|
||||||
type_: "mock".to_string(),
|
type_: "mock".to_string(),
|
||||||
connection: mock_name.to_string(),
|
connection: mock_name.to_string(),
|
||||||
|
@ -333,7 +334,7 @@ mod tests {
|
||||||
factory.register_mock(mock_name.to_string(), state);
|
factory.register_mock(mock_name.to_string(), state);
|
||||||
|
|
||||||
let server_id = ServerId::try_from(1).unwrap();
|
let server_id = ServerId::try_from(1).unwrap();
|
||||||
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
|
let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
|
||||||
let cfg = WriteBufferConnection {
|
let cfg = WriteBufferConnection {
|
||||||
type_: "mock".to_string(),
|
type_: "mock".to_string(),
|
||||||
connection: mock_name.to_string(),
|
connection: mock_name.to_string(),
|
||||||
|
@ -366,7 +367,7 @@ mod tests {
|
||||||
let mock_name = "some_mock";
|
let mock_name = "some_mock";
|
||||||
factory.register_always_fail_mock(mock_name.to_string());
|
factory.register_always_fail_mock(mock_name.to_string());
|
||||||
|
|
||||||
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
|
let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
|
||||||
let cfg = WriteBufferConnection {
|
let cfg = WriteBufferConnection {
|
||||||
type_: "mock".to_string(),
|
type_: "mock".to_string(),
|
||||||
connection: mock_name.to_string(),
|
connection: mock_name.to_string(),
|
||||||
|
@ -444,4 +445,99 @@ mod tests {
|
||||||
let registry = Arc::new(metric::Registry::new());
|
let registry = Arc::new(metric::Registry::new());
|
||||||
WriteBufferConfigFactory::new(time, registry)
|
WriteBufferConfigFactory::new(time, registry)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "kafka")]
|
||||||
|
mod kafka {
|
||||||
|
use super::*;
|
||||||
|
use crate::maybe_skip_kafka_integration;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_writing_kafka() {
|
||||||
|
let conn = maybe_skip_kafka_integration!();
|
||||||
|
let factory = factory();
|
||||||
|
let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
|
||||||
|
let cfg = WriteBufferConnection {
|
||||||
|
type_: "kafka".to_string(),
|
||||||
|
connection: conn,
|
||||||
|
creation_config: Some(WriteBufferCreationConfig::default()),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let conn = factory
|
||||||
|
.new_config_write(db_name.as_str(), &cfg)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(conn.type_name(), "kafka");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_reading_kafka() {
|
||||||
|
let conn = maybe_skip_kafka_integration!();
|
||||||
|
let factory = factory();
|
||||||
|
let server_id = ServerId::try_from(1).unwrap();
|
||||||
|
|
||||||
|
let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
|
||||||
|
let cfg = WriteBufferConnection {
|
||||||
|
type_: "kafka".to_string(),
|
||||||
|
connection: conn,
|
||||||
|
creation_config: Some(WriteBufferCreationConfig::default()),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let conn = factory
|
||||||
|
.new_config_read(server_id, db_name.as_str(), None, &cfg)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(conn.type_name(), "kafka");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "kafka"))]
|
||||||
|
mod no_kafka {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn writing_to_kafka_without_kafka_feature_returns_error() {
|
||||||
|
let factory = factory();
|
||||||
|
let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
|
||||||
|
let cfg = WriteBufferConnection {
|
||||||
|
type_: "kafka".to_string(),
|
||||||
|
creation_config: Some(WriteBufferCreationConfig::default()),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let err = factory
|
||||||
|
.new_config_write(db_name.as_str(), &cfg)
|
||||||
|
.await
|
||||||
|
.unwrap_err();
|
||||||
|
assert_eq!(
|
||||||
|
err.to_string(),
|
||||||
|
"`WriteBufferWriting` of type `kafka` requested, but Kafka support was not \
|
||||||
|
included in this build by enabling the `kafka` feature"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn reading_from_kafka_without_kafka_feature_returns_error() {
|
||||||
|
let factory = factory();
|
||||||
|
let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
|
||||||
|
let server_id = ServerId::try_from(1).unwrap();
|
||||||
|
let cfg = WriteBufferConnection {
|
||||||
|
type_: "kafka".to_string(),
|
||||||
|
creation_config: Some(WriteBufferCreationConfig::default()),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let err = factory
|
||||||
|
.new_config_read(server_id, db_name.as_str(), None, &cfg)
|
||||||
|
.await
|
||||||
|
.unwrap_err();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
err.to_string(),
|
||||||
|
"`WriteBufferReading` of type `kafka` requested, but Kafka support was not \
|
||||||
|
included in this build by enabling the `kafka` feature"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,6 +99,10 @@ pub trait WriteBufferReading: Sync + Send + Debug + 'static {
|
||||||
|
|
||||||
pub mod test_utils {
|
pub mod test_utils {
|
||||||
//! Generic tests for all write buffer implementations.
|
//! Generic tests for all write buffer implementations.
|
||||||
|
use super::{WriteBufferError, WriteBufferReading, WriteBufferWriting};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use dml::{test_util::assert_write_op_eq, DmlMeta, DmlOperation, DmlWrite};
|
||||||
|
use futures::{StreamExt, TryStreamExt};
|
||||||
use std::{
|
use std::{
|
||||||
collections::{BTreeMap, BTreeSet},
|
collections::{BTreeMap, BTreeSet},
|
||||||
convert::TryFrom,
|
convert::TryFrom,
|
||||||
|
@ -106,14 +110,14 @@ pub mod test_utils {
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use dml::{test_util::assert_write_op_eq, DmlMeta, DmlOperation, DmlWrite};
|
|
||||||
use futures::{StreamExt, TryStreamExt};
|
|
||||||
use time::{Time, TimeProvider};
|
use time::{Time, TimeProvider};
|
||||||
use trace::{ctx::SpanContext, RingBufferTraceCollector, TraceCollector};
|
use trace::{ctx::SpanContext, RingBufferTraceCollector, TraceCollector};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
use super::{WriteBufferError, WriteBufferReading, WriteBufferWriting};
|
/// Generated random topic name for testing.
|
||||||
|
pub fn random_topic_name() -> String {
|
||||||
|
format!("test_topic_{}", Uuid::new_v4())
|
||||||
|
}
|
||||||
|
|
||||||
/// Adapter to make a concrete write buffer implementation work w/ [`perform_generic_tests`].
|
/// Adapter to make a concrete write buffer implementation work w/ [`perform_generic_tests`].
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|
|
@ -1,11 +1,3 @@
|
||||||
use std::{
|
|
||||||
collections::{BTreeMap, BTreeSet},
|
|
||||||
convert::{TryFrom, TryInto},
|
|
||||||
num::NonZeroU32,
|
|
||||||
sync::Arc,
|
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures::{FutureExt, StreamExt};
|
use futures::{FutureExt, StreamExt};
|
||||||
use metric::{Metric, U64Gauge, U64Histogram, U64HistogramOptions};
|
use metric::{Metric, U64Gauge, U64Histogram, U64HistogramOptions};
|
||||||
|
@ -23,15 +15,13 @@ use rdkafka::{
|
||||||
util::Timeout,
|
util::Timeout,
|
||||||
ClientConfig, ClientContext, Message, Offset, TopicPartitionList,
|
ClientConfig, ClientContext, Message, Offset, TopicPartitionList,
|
||||||
};
|
};
|
||||||
|
use std::{
|
||||||
use data_types::{
|
collections::{BTreeMap, BTreeSet},
|
||||||
sequence::Sequence, server_id::ServerId, write_buffer::WriteBufferCreationConfig,
|
convert::{TryFrom, TryInto},
|
||||||
|
num::NonZeroU32,
|
||||||
|
sync::Arc,
|
||||||
|
time::Duration,
|
||||||
};
|
};
|
||||||
use dml::{DmlMeta, DmlOperation};
|
|
||||||
use observability_deps::tracing::{debug, info};
|
|
||||||
use time::{Time, TimeProvider};
|
|
||||||
use tokio::task::JoinHandle;
|
|
||||||
use trace::TraceCollector;
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
codec::{ContentType, IoxHeaders},
|
codec::{ContentType, IoxHeaders},
|
||||||
|
@ -40,6 +30,14 @@ use crate::{
|
||||||
WriteBufferWriting, WriteStream,
|
WriteBufferWriting, WriteStream,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
use data_types::{
|
||||||
|
sequence::Sequence, server_id::ServerId, write_buffer::WriteBufferCreationConfig,
|
||||||
|
};
|
||||||
|
use dml::{DmlMeta, DmlOperation};
|
||||||
|
use observability_deps::tracing::{debug, info};
|
||||||
|
use time::{Time, TimeProvider};
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
use trace::TraceCollector;
|
||||||
|
|
||||||
/// Default timeout supplied to rdkafka client for kafka operations.
|
/// Default timeout supplied to rdkafka client for kafka operations.
|
||||||
///
|
///
|
||||||
|
@ -745,12 +743,9 @@ impl ClientContext for ClientContextImpl {
|
||||||
impl ConsumerContext for ClientContextImpl {}
|
impl ConsumerContext for ClientContextImpl {}
|
||||||
|
|
||||||
pub mod test_utils {
|
pub mod test_utils {
|
||||||
use std::{collections::BTreeMap, time::Duration};
|
|
||||||
|
|
||||||
use rdkafka::admin::{AdminOptions, AlterConfig, ResourceSpecifier};
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use super::admin_client;
|
use super::admin_client;
|
||||||
|
use rdkafka::admin::{AdminOptions, AlterConfig, ResourceSpecifier};
|
||||||
|
use std::{collections::BTreeMap, time::Duration};
|
||||||
|
|
||||||
/// Get the testing Kafka connection string or return current scope.
|
/// Get the testing Kafka connection string or return current scope.
|
||||||
///
|
///
|
||||||
|
@ -829,37 +824,28 @@ pub mod test_utils {
|
||||||
let result = results.pop().expect("just checked the vector length");
|
let result = results.pop().expect("just checked the vector length");
|
||||||
result.unwrap();
|
result.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generated random topic name for testing.
|
|
||||||
pub fn random_kafka_topic() -> String {
|
|
||||||
format!("test_topic_{}", Uuid::new_v4())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Kafka tests (only run when in integration test mode and kafka is running).
|
/// Kafka tests (only run when in integration test mode and kafka is running).
|
||||||
/// see [`crate::maybe_skip_kafka_integration`] for more details.
|
/// see [`crate::maybe_skip_kafka_integration`] for more details.
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use super::{test_utils::kafka_sequencer_options, *};
|
||||||
|
use crate::{
|
||||||
|
codec::HEADER_CONTENT_TYPE,
|
||||||
|
core::test_utils::{
|
||||||
|
map_pop_first, perform_generic_tests, random_topic_name, set_pop_first,
|
||||||
|
write as write_to_writer, TestAdapter, TestContext,
|
||||||
|
},
|
||||||
|
maybe_skip_kafka_integration,
|
||||||
|
};
|
||||||
use std::{
|
use std::{
|
||||||
num::NonZeroU32,
|
num::NonZeroU32,
|
||||||
sync::atomic::{AtomicU32, Ordering},
|
sync::atomic::{AtomicU32, Ordering},
|
||||||
};
|
};
|
||||||
|
|
||||||
use time::TimeProvider;
|
use time::TimeProvider;
|
||||||
use trace::{RingBufferTraceCollector, TraceCollector};
|
use trace::{RingBufferTraceCollector, TraceCollector};
|
||||||
|
|
||||||
use crate::codec::HEADER_CONTENT_TYPE;
|
|
||||||
use crate::{
|
|
||||||
core::test_utils::{
|
|
||||||
map_pop_first, perform_generic_tests, set_pop_first, write as write_to_writer,
|
|
||||||
TestAdapter, TestContext,
|
|
||||||
},
|
|
||||||
kafka::test_utils::random_kafka_topic,
|
|
||||||
maybe_skip_kafka_integration,
|
|
||||||
};
|
|
||||||
|
|
||||||
use super::{test_utils::kafka_sequencer_options, *};
|
|
||||||
|
|
||||||
struct KafkaTestAdapter {
|
struct KafkaTestAdapter {
|
||||||
conn: String,
|
conn: String,
|
||||||
}
|
}
|
||||||
|
@ -881,7 +867,7 @@ mod tests {
|
||||||
) -> Self::Context {
|
) -> Self::Context {
|
||||||
KafkaTestContext {
|
KafkaTestContext {
|
||||||
conn: self.conn.clone(),
|
conn: self.conn.clone(),
|
||||||
database_name: random_kafka_topic(),
|
database_name: random_topic_name(),
|
||||||
server_id_counter: AtomicU32::new(1),
|
server_id_counter: AtomicU32::new(1),
|
||||||
n_sequencers,
|
n_sequencers,
|
||||||
time_provider,
|
time_provider,
|
||||||
|
@ -964,7 +950,7 @@ mod tests {
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn topic_create_twice() {
|
async fn topic_create_twice() {
|
||||||
let conn = maybe_skip_kafka_integration!();
|
let conn = maybe_skip_kafka_integration!();
|
||||||
let database_name = random_kafka_topic();
|
let database_name = random_topic_name();
|
||||||
|
|
||||||
create_kafka_topic(
|
create_kafka_topic(
|
||||||
&conn,
|
&conn,
|
||||||
|
|
|
@ -12,5 +12,8 @@ pub(crate) mod codec;
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod core;
|
pub mod core;
|
||||||
pub mod file;
|
pub mod file;
|
||||||
|
|
||||||
|
#[cfg(feature = "kafka")]
|
||||||
pub mod kafka;
|
pub mod kafka;
|
||||||
|
|
||||||
pub mod mock;
|
pub mod mock;
|
||||||
|
|
Loading…
Reference in New Issue