refactor: remove methods from `ManagementService` that were moved to `DeploymentService`
Tests are covered by `deployment_api.rs`. Ref #2980.pull/24376/head
parent
6a4ddb357f
commit
dc6b44818d
|
@ -10,27 +10,6 @@ import "influxdata/iox/management/v1/chunk.proto";
|
|||
import "influxdata/iox/management/v1/partition.proto";
|
||||
|
||||
service ManagementService {
|
||||
// Get server ID.
|
||||
//
|
||||
// This call is deprecated, see <https://github.com/influxdata/influxdb_iox/issues/2980>.
|
||||
rpc GetServerId(GetServerIdRequest) returns (GetServerIdResponse) {
|
||||
option deprecated = true;
|
||||
};
|
||||
|
||||
// Update server ID.
|
||||
//
|
||||
// This call is deprecated, see <https://github.com/influxdata/influxdb_iox/issues/2980>.
|
||||
rpc UpdateServerId(UpdateServerIdRequest) returns (UpdateServerIdResponse) {
|
||||
option deprecated = true;
|
||||
};
|
||||
|
||||
// Set serving readiness.
|
||||
//
|
||||
// This call is deprecated, see <https://github.com/influxdata/influxdb_iox/issues/2980>.
|
||||
rpc SetServingReadiness(SetServingReadinessRequest) returns (SetServingReadinessResponse) {
|
||||
option deprecated = true;
|
||||
};
|
||||
|
||||
// List all databases on this server.
|
||||
//
|
||||
// Roughly follows the <https://google.aip.dev/132> pattern, except we wrap the response
|
||||
|
@ -128,27 +107,6 @@ service ManagementService {
|
|||
rpc DropPartition(DropPartitionRequest) returns (DropPartitionResponse);
|
||||
}
|
||||
|
||||
message GetServerIdRequest {}
|
||||
|
||||
message GetServerIdResponse {
|
||||
// Must be non-zero
|
||||
uint32 id = 1;
|
||||
}
|
||||
|
||||
message UpdateServerIdRequest {
|
||||
// Must be non-zero
|
||||
uint32 id = 1;
|
||||
}
|
||||
|
||||
message UpdateServerIdResponse {}
|
||||
|
||||
message SetServingReadinessRequest {
|
||||
// If false, the IOx server will respond with UNAVAILABLE to all data plane requests.
|
||||
bool ready = 1;
|
||||
}
|
||||
|
||||
message SetServingReadinessResponse {}
|
||||
|
||||
message ListDatabasesRequest {
|
||||
// If true, returns only explicitly defined values. See additional
|
||||
// details on `GetDatabaseRequest`.
|
||||
|
|
|
@ -1,9 +1,13 @@
|
|||
//! Implementation of command line option for manipulating and showing server
|
||||
//! config
|
||||
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{
|
||||
num::NonZeroU32,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use crate::commands::server_remote;
|
||||
use influxdb_iox_client::{connection::Connection, deployment, management};
|
||||
use structopt::StructOpt;
|
||||
use thiserror::Error;
|
||||
|
||||
|
@ -14,13 +18,13 @@ pub enum Error {
|
|||
RemoteError(#[from] server_remote::Error),
|
||||
|
||||
#[error("Error getting server ID: {0}")]
|
||||
GetServerIdError(#[from] GetServerIdError),
|
||||
GetServerIdError(#[from] deployment::GetServerIdError),
|
||||
|
||||
#[error("Error updating server ID: {0}")]
|
||||
UpdateServerIdError(#[from] UpdateServerIdError),
|
||||
UpdateServerIdError(#[from] deployment::UpdateServerIdError),
|
||||
|
||||
#[error("Error checking if databases are loded: {0}")]
|
||||
AreDatabasesLoadedError(#[from] GetServerStatusError),
|
||||
AreDatabasesLoadedError(#[from] management::GetServerStatusError),
|
||||
|
||||
#[error("Timeout waiting for databases to be loaded")]
|
||||
TimeoutDatabasesLoaded,
|
||||
|
@ -53,7 +57,7 @@ enum Command {
|
|||
#[derive(Debug, StructOpt)]
|
||||
struct Set {
|
||||
/// The server ID to set
|
||||
id: u32,
|
||||
id: NonZeroU32,
|
||||
}
|
||||
|
||||
/// Wait until server is initialized.
|
||||
|
@ -64,24 +68,22 @@ struct WaitSeverInitialized {
|
|||
timeout: u64,
|
||||
}
|
||||
|
||||
use influxdb_iox_client::{connection::Connection, management::*};
|
||||
|
||||
pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
||||
match config.command {
|
||||
Command::Set(command) => {
|
||||
let mut client = Client::new(connection);
|
||||
let mut client = deployment::Client::new(connection);
|
||||
client.update_server_id(command.id).await?;
|
||||
println!("Ok");
|
||||
Ok(())
|
||||
}
|
||||
Command::Get => {
|
||||
let mut client = Client::new(connection);
|
||||
let mut client = deployment::Client::new(connection);
|
||||
let id = client.get_server_id().await?;
|
||||
println!("{}", id);
|
||||
println!("{}", id.get());
|
||||
Ok(())
|
||||
}
|
||||
Command::WaitServerInitialized(command) => {
|
||||
let mut client = Client::new(connection);
|
||||
let mut client = management::Client::new(connection);
|
||||
let end = Instant::now() + Duration::from_secs(command.timeout);
|
||||
loop {
|
||||
let status = client.get_server_status().await?;
|
||||
|
|
|
@ -14,49 +14,17 @@ use uuid::Uuid;
|
|||
struct ManagementService<M: ConnectionManager> {
|
||||
application: Arc<ApplicationState>,
|
||||
server: Arc<Server<M>>,
|
||||
serving_readiness: ServingReadiness,
|
||||
}
|
||||
|
||||
use super::error::{
|
||||
default_database_error_handler, default_db_error_handler, default_server_error_handler,
|
||||
};
|
||||
use crate::influxdb_ioxd::serving_readiness::ServingReadiness;
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl<M> management_service_server::ManagementService for ManagementService<M>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
async fn get_server_id(
|
||||
&self,
|
||||
_: Request<GetServerIdRequest>,
|
||||
) -> Result<Response<GetServerIdResponse>, Status> {
|
||||
match self.server.server_id() {
|
||||
Some(id) => Ok(Response::new(GetServerIdResponse { id: id.get_u32() })),
|
||||
None => return Err(NotFound::default().into()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_server_id(
|
||||
&self,
|
||||
request: Request<UpdateServerIdRequest>,
|
||||
) -> Result<Response<UpdateServerIdResponse>, Status> {
|
||||
let id =
|
||||
ServerId::try_from(request.get_ref().id).map_err(|_| FieldViolation::required("id"))?;
|
||||
|
||||
match self.server.set_id(id) {
|
||||
Ok(_) => Ok(Response::new(UpdateServerIdResponse {})),
|
||||
Err(e @ Error::IdAlreadySet) => {
|
||||
return Err(FieldViolation {
|
||||
field: "id".to_string(),
|
||||
description: e.to_string(),
|
||||
}
|
||||
.into())
|
||||
}
|
||||
Err(e) => Err(default_server_error_handler(e)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn list_databases(
|
||||
&self,
|
||||
request: Request<ListDatabasesRequest>,
|
||||
|
@ -457,15 +425,6 @@ where
|
|||
Ok(Response::new(UnloadPartitionChunkResponse {}))
|
||||
}
|
||||
|
||||
async fn set_serving_readiness(
|
||||
&self,
|
||||
request: Request<SetServingReadinessRequest>,
|
||||
) -> Result<Response<SetServingReadinessResponse>, Status> {
|
||||
let SetServingReadinessRequest { ready } = request.into_inner();
|
||||
self.serving_readiness.set(ready.into());
|
||||
Ok(Response::new(SetServingReadinessResponse {}))
|
||||
}
|
||||
|
||||
async fn get_server_status(
|
||||
&self,
|
||||
_request: Request<GetServerStatusRequest>,
|
||||
|
@ -617,7 +576,6 @@ fn format_rules(provided_rules: Arc<ProvidedDatabaseRules>, omit_defaults: bool)
|
|||
pub fn make_server<M>(
|
||||
application: Arc<ApplicationState>,
|
||||
server: Arc<Server<M>>,
|
||||
serving_readiness: ServingReadiness,
|
||||
) -> management_service_server::ManagementServiceServer<
|
||||
impl management_service_server::ManagementService,
|
||||
>
|
||||
|
@ -627,6 +585,5 @@ where
|
|||
management_service_server::ManagementServiceServer::new(ManagementService {
|
||||
application,
|
||||
server,
|
||||
serving_readiness,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -49,7 +49,6 @@ where
|
|||
management::make_server(
|
||||
Arc::clone(&server_type.application),
|
||||
Arc::clone(&server_type.server),
|
||||
server_type.serving_readiness.clone(),
|
||||
)
|
||||
);
|
||||
add_service!(
|
||||
|
|
|
@ -8,9 +8,8 @@ use generated_types::{
|
|||
use influxdb_iox_client::{
|
||||
management::{Client, CreateDatabaseError},
|
||||
router::generated_types::{write_buffer_connection, WriteBufferConnection},
|
||||
write::WriteError,
|
||||
};
|
||||
use std::{fs::set_permissions, os::unix::fs::PermissionsExt};
|
||||
use std::{fs::set_permissions, num::NonZeroU32, os::unix::fs::PermissionsExt};
|
||||
use test_helpers::assert_contains;
|
||||
|
||||
use super::scenario::{
|
||||
|
@ -25,43 +24,8 @@ use crate::{
|
|||
use chrono::{DateTime, Utc};
|
||||
use std::convert::TryInto;
|
||||
use std::time::Instant;
|
||||
use tonic::Code;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_serving_readiness() {
|
||||
let server_fixture = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut mgmt_client = server_fixture.management_client();
|
||||
let mut write_client = server_fixture.write_client();
|
||||
|
||||
let name = "foo";
|
||||
let lp_data = "bar baz=1 10";
|
||||
|
||||
mgmt_client
|
||||
.update_server_id(42)
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
server_fixture.wait_server_initialized().await;
|
||||
mgmt_client
|
||||
.create_database(DatabaseRules {
|
||||
name: name.to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("create database failed");
|
||||
|
||||
mgmt_client.set_serving_readiness(false).await.unwrap();
|
||||
let err = write_client.write_lp(name, lp_data, 0).await.unwrap_err();
|
||||
assert!(
|
||||
matches!(&err, WriteError::ServerError(status) if status.code() == Code::Unavailable),
|
||||
"{}",
|
||||
&err
|
||||
);
|
||||
|
||||
mgmt_client.set_serving_readiness(true).await.unwrap();
|
||||
write_client.write_lp(name, lp_data, 0).await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_update_remotes() {
|
||||
let server_fixture = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
|
@ -122,23 +86,6 @@ async fn test_list_update_remotes() {
|
|||
assert_eq!(res[0].connection_string, TEST_REMOTE_ADDR_2_UPDATED);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_get_writer_id() {
|
||||
let server_fixture = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut client = server_fixture.management_client();
|
||||
|
||||
const TEST_ID: u32 = 42;
|
||||
|
||||
client
|
||||
.update_server_id(TEST_ID)
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
|
||||
let got = client.get_server_id().await.expect("get ID failed");
|
||||
|
||||
assert_eq!(got.get(), TEST_ID);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_database_duplicate_name() {
|
||||
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
|
||||
|
@ -1231,24 +1178,28 @@ fn normalize_chunks(chunks: Vec<Chunk>) -> Vec<Chunk> {
|
|||
#[tokio::test]
|
||||
async fn test_get_server_status_ok() {
|
||||
let server_fixture = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut client = server_fixture.management_client();
|
||||
let mut deployment_client = server_fixture.deployment_client();
|
||||
let mut management_client = server_fixture.management_client();
|
||||
|
||||
// not initalized
|
||||
let status = client.get_server_status().await.unwrap();
|
||||
let status = management_client.get_server_status().await.unwrap();
|
||||
assert!(!status.initialized);
|
||||
|
||||
// initialize
|
||||
client.update_server_id(42).await.expect("set ID failed");
|
||||
deployment_client
|
||||
.update_server_id(NonZeroU32::new(42).unwrap())
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
server_fixture.wait_server_initialized().await;
|
||||
|
||||
// now initalized
|
||||
let status = client.get_server_status().await.unwrap();
|
||||
let status = management_client.get_server_status().await.unwrap();
|
||||
assert!(status.initialized);
|
||||
|
||||
// create DBs
|
||||
let db_name1 = rand_name();
|
||||
let db_name2 = rand_name();
|
||||
client
|
||||
management_client
|
||||
.create_database(DatabaseRules {
|
||||
name: db_name1.clone(),
|
||||
..Default::default()
|
||||
|
@ -1256,7 +1207,7 @@ async fn test_get_server_status_ok() {
|
|||
.await
|
||||
.expect("create database failed");
|
||||
|
||||
client
|
||||
management_client
|
||||
.create_database(DatabaseRules {
|
||||
name: db_name2.clone(),
|
||||
..Default::default()
|
||||
|
@ -1271,7 +1222,7 @@ async fn test_get_server_status_ok() {
|
|||
} else {
|
||||
(db_name2, db_name1)
|
||||
};
|
||||
let status = client.get_server_status().await.unwrap();
|
||||
let status = management_client.get_server_status().await.unwrap();
|
||||
let names: Vec<_> = status
|
||||
.database_statuses
|
||||
.iter()
|
||||
|
@ -1298,7 +1249,8 @@ async fn test_get_server_status_ok() {
|
|||
#[tokio::test]
|
||||
async fn test_get_server_status_global_error() {
|
||||
let server_fixture = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut client = server_fixture.management_client();
|
||||
let mut deployment_client = server_fixture.deployment_client();
|
||||
let mut management_client = server_fixture.management_client();
|
||||
|
||||
// we need to "break" the object store AFTER the server was started, otherwise the server
|
||||
// process will exit immediately
|
||||
|
@ -1308,13 +1260,16 @@ async fn test_get_server_status_global_error() {
|
|||
set_permissions(server_fixture.dir(), permissions).unwrap();
|
||||
|
||||
// setup server
|
||||
client.update_server_id(42).await.expect("set ID failed");
|
||||
deployment_client
|
||||
.update_server_id(NonZeroU32::new(42).unwrap())
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
|
||||
let check = async {
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_millis(500));
|
||||
|
||||
loop {
|
||||
let status = client.get_server_status().await.unwrap();
|
||||
let status = management_client.get_server_status().await.unwrap();
|
||||
if let Some(err) = status.error {
|
||||
assert!(dbg!(err.message)
|
||||
.starts_with("error getting server config from object storage:"));
|
||||
|
@ -1332,7 +1287,8 @@ async fn test_get_server_status_global_error() {
|
|||
#[tokio::test]
|
||||
async fn test_get_server_status_db_error() {
|
||||
let server_fixture = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut client = server_fixture.management_client();
|
||||
let mut deployment_client = server_fixture.deployment_client();
|
||||
let mut management_client = server_fixture.management_client();
|
||||
|
||||
// Valid content of the owner.pb file
|
||||
let owner_info = OwnerInfo {
|
||||
|
@ -1377,11 +1333,14 @@ async fn test_get_server_status_db_error() {
|
|||
std::fs::write(path, encoded).unwrap();
|
||||
|
||||
// initialize
|
||||
client.update_server_id(42).await.expect("set ID failed");
|
||||
deployment_client
|
||||
.update_server_id(NonZeroU32::new(42).unwrap())
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
server_fixture.wait_server_initialized().await;
|
||||
|
||||
// check for errors
|
||||
let status = client.get_server_status().await.unwrap();
|
||||
let status = management_client.get_server_status().await.unwrap();
|
||||
assert!(status.initialized);
|
||||
assert_eq!(status.error, None);
|
||||
assert_eq!(status.database_statuses.len(), 1);
|
||||
|
|
|
@ -52,7 +52,7 @@ async fn test_server_id() {
|
|||
.arg(addr)
|
||||
.assert()
|
||||
.failure()
|
||||
.stderr(predicate::str::contains("id already set"));
|
||||
.stderr(predicate::str::contains("ID already set"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
use std::num::NonZeroU32;
|
||||
|
||||
use crate::common::server_fixture::{ServerFixture, ServerType, TestConfig};
|
||||
use crate::end_to_end_cases::scenario::Scenario;
|
||||
use test_helpers::assert_contains;
|
||||
|
@ -7,9 +9,13 @@ pub async fn test_row_timestamp() {
|
|||
let test_config = TestConfig::new(ServerType::Database)
|
||||
.with_env("INFLUXDB_IOX_ROW_TIMESTAMP_METRICS", "system");
|
||||
let server_fixture = ServerFixture::create_single_use_with_config(test_config).await;
|
||||
let mut deployment_client = server_fixture.deployment_client();
|
||||
let mut management_client = server_fixture.management_client();
|
||||
|
||||
management_client.update_server_id(1).await.unwrap();
|
||||
deployment_client
|
||||
.update_server_id(NonZeroU32::new(1).unwrap())
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
server_fixture.wait_server_initialized().await;
|
||||
|
||||
let scenario = Scenario::new();
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
use std::num::NonZeroU32;
|
||||
|
||||
use itertools::Itertools;
|
||||
|
||||
use arrow_util::assert_batches_eq;
|
||||
|
@ -196,12 +198,13 @@ async fn test_update_late_arrival() {
|
|||
async fn test_query_chunk_after_restart() {
|
||||
// fixtures
|
||||
let fixture = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut deployment_client = fixture.deployment_client();
|
||||
let mut management_client = fixture.management_client();
|
||||
let db_name = rand_name();
|
||||
|
||||
// set server ID
|
||||
let mut management_client = fixture.management_client();
|
||||
management_client
|
||||
.update_server_id(DEFAULT_SERVER_ID)
|
||||
deployment_client
|
||||
.update_server_id(NonZeroU32::new(DEFAULT_SERVER_ID).unwrap())
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
fixture.wait_server_initialized().await;
|
||||
|
|
|
@ -590,15 +590,13 @@ pub async fn list_chunks(fixture: &ServerFixture, db_name: &str) -> Vec<ChunkSum
|
|||
|
||||
/// Creates a database with a broken catalog
|
||||
pub async fn fixture_broken_catalog(db_name: &str) -> ServerFixture {
|
||||
let server_id = DEFAULT_SERVER_ID;
|
||||
|
||||
let test_config =
|
||||
TestConfig::new(ServerType::Database).with_env("INFLUXDB_IOX_WIPE_CATALOG_ON_ERROR", "no");
|
||||
|
||||
let fixture = ServerFixture::create_single_use_with_config(test_config).await;
|
||||
fixture
|
||||
.management_client()
|
||||
.update_server_id(server_id)
|
||||
.deployment_client()
|
||||
.update_server_id(NonZeroU32::new(DEFAULT_SERVER_ID).unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
fixture.wait_server_initialized().await;
|
||||
|
@ -647,15 +645,13 @@ pub async fn fixture_broken_catalog(db_name: &str) -> ServerFixture {
|
|||
|
||||
/// Creates a database that cannot be replayed
|
||||
pub async fn fixture_replay_broken(db_name: &str, write_buffer_path: &Path) -> ServerFixture {
|
||||
let server_id = DEFAULT_SERVER_ID;
|
||||
|
||||
let test_config =
|
||||
TestConfig::new(ServerType::Database).with_env("INFLUXDB_IOX_SKIP_REPLAY", "no");
|
||||
|
||||
let fixture = ServerFixture::create_single_use_with_config(test_config).await;
|
||||
fixture
|
||||
.management_client()
|
||||
.update_server_id(server_id)
|
||||
.deployment_client()
|
||||
.update_server_id(NonZeroU32::new(DEFAULT_SERVER_ID).unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
fixture.wait_server_initialized().await;
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
use std::num::NonZeroU32;
|
||||
|
||||
use super::scenario::{collect_query, Scenario};
|
||||
use crate::common::{
|
||||
server_fixture::{ServerFixture, ServerType, TestConfig},
|
||||
|
@ -21,9 +23,12 @@ async fn setup() -> (UdpCapture, ServerFixture) {
|
|||
|
||||
let server_fixture = ServerFixture::create_single_use_with_config(test_config).await;
|
||||
|
||||
let mut management_client = server_fixture.management_client();
|
||||
let mut deployment_client = server_fixture.deployment_client();
|
||||
|
||||
management_client.update_server_id(1).await.unwrap();
|
||||
deployment_client
|
||||
.update_server_id(NonZeroU32::new(1).unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
server_fixture.wait_server_initialized().await;
|
||||
|
||||
(udp_capture, server_fixture)
|
||||
|
@ -127,9 +132,12 @@ pub async fn test_tracing_create_trace() {
|
|||
|
||||
let server_fixture = ServerFixture::create_single_use_with_config(test_config).await;
|
||||
|
||||
let mut management_client = server_fixture.management_client();
|
||||
let mut deployment_client = server_fixture.deployment_client();
|
||||
|
||||
management_client.update_server_id(1).await.unwrap();
|
||||
deployment_client
|
||||
.update_server_id(NonZeroU32::new(1).unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
server_fixture.wait_server_initialized().await;
|
||||
|
||||
run_sql_query(&server_fixture).await;
|
||||
|
|
|
@ -13,7 +13,7 @@ use generated_types::influxdata::iox::management::v1::{
|
|||
node_group::Node, sink, HashRing, Matcher, MatcherToShard, NodeGroup, RoutingConfig,
|
||||
ShardConfig, Sink,
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
use std::{collections::HashMap, num::NonZeroU32};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write() {
|
||||
|
@ -100,11 +100,11 @@ async fn test_write() {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_write_routed() {
|
||||
const TEST_ROUTER_ID: u32 = 1;
|
||||
let test_router_id = NonZeroU32::new(1).unwrap();
|
||||
|
||||
const TEST_TARGET_ID_1: u32 = 2;
|
||||
const TEST_TARGET_ID_2: u32 = 3;
|
||||
const TEST_TARGET_ID_3: u32 = 4;
|
||||
let test_target_id_1 = NonZeroU32::new(2).unwrap();
|
||||
let test_target_id_2 = NonZeroU32::new(3).unwrap();
|
||||
let test_target_id_3 = NonZeroU32::new(4).unwrap();
|
||||
|
||||
const TEST_REMOTE_ID_1: u32 = 2;
|
||||
const TEST_REMOTE_ID_2: u32 = 3;
|
||||
|
@ -115,17 +115,18 @@ async fn test_write_routed() {
|
|||
const TEST_SHARD_ID_3: u32 = 44;
|
||||
|
||||
let router = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut router_deployment = router.deployment_client();
|
||||
let mut router_mgmt = router.management_client();
|
||||
router_mgmt
|
||||
.update_server_id(TEST_ROUTER_ID)
|
||||
router_deployment
|
||||
.update_server_id(test_router_id)
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
router.wait_server_initialized().await;
|
||||
|
||||
let target_1 = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut target_1_mgmt = target_1.management_client();
|
||||
target_1_mgmt
|
||||
.update_server_id(TEST_TARGET_ID_1)
|
||||
let mut target_1_deployment = target_1.deployment_client();
|
||||
target_1_deployment
|
||||
.update_server_id(test_target_id_1)
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
target_1.wait_server_initialized().await;
|
||||
|
@ -136,9 +137,9 @@ async fn test_write_routed() {
|
|||
.expect("set remote failed");
|
||||
|
||||
let target_2 = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut target_2_mgmt = target_2.management_client();
|
||||
target_2_mgmt
|
||||
.update_server_id(TEST_TARGET_ID_2)
|
||||
let mut target_2_deployment = target_2.deployment_client();
|
||||
target_2_deployment
|
||||
.update_server_id(test_target_id_2)
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
target_2.wait_server_initialized().await;
|
||||
|
@ -149,9 +150,9 @@ async fn test_write_routed() {
|
|||
.expect("set remote failed");
|
||||
|
||||
let target_3 = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut target_3_mgmt = target_3.management_client();
|
||||
target_3_mgmt
|
||||
.update_server_id(TEST_TARGET_ID_3)
|
||||
let mut target_3_deployment = target_3.deployment_client();
|
||||
target_3_deployment
|
||||
.update_server_id(test_target_id_3)
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
target_3.wait_server_initialized().await;
|
||||
|
@ -336,14 +337,15 @@ async fn test_write_routed() {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_write_routed_errors() {
|
||||
const TEST_ROUTER_ID: u32 = 1;
|
||||
let test_router_id = NonZeroU32::new(1).unwrap();
|
||||
const TEST_REMOTE_ID: u32 = 2;
|
||||
const TEST_SHARD_ID: u32 = 42;
|
||||
|
||||
let router = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut router_deployment = router.deployment_client();
|
||||
let mut router_mgmt = router.management_client();
|
||||
router_mgmt
|
||||
.update_server_id(TEST_ROUTER_ID)
|
||||
router_deployment
|
||||
.update_server_id(test_router_id)
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
router.wait_server_initialized().await;
|
||||
|
@ -405,13 +407,14 @@ async fn test_write_routed_errors() {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_write_dev_null() {
|
||||
const TEST_ROUTER_ID: u32 = 1;
|
||||
let test_router_id = NonZeroU32::new(1).unwrap();
|
||||
const TEST_SHARD_ID: u32 = 42;
|
||||
|
||||
let router = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut router_deployment = router.deployment_client();
|
||||
let mut router_mgmt = router.management_client();
|
||||
router_mgmt
|
||||
.update_server_id(TEST_ROUTER_ID)
|
||||
router_deployment
|
||||
.update_server_id(test_router_id)
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
router.wait_server_initialized().await;
|
||||
|
@ -471,28 +474,29 @@ async fn test_write_dev_null() {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_write_routed_no_shard() {
|
||||
const TEST_ROUTER_ID: u32 = 1;
|
||||
let test_router_id = NonZeroU32::new(1).unwrap();
|
||||
|
||||
const TEST_TARGET_ID_1: u32 = 2;
|
||||
const TEST_TARGET_ID_2: u32 = 3;
|
||||
const TEST_TARGET_ID_3: u32 = 4;
|
||||
let test_target_id_1 = NonZeroU32::new(2).unwrap();
|
||||
let test_target_id_2 = NonZeroU32::new(3).unwrap();
|
||||
let test_target_id_3 = NonZeroU32::new(4).unwrap();
|
||||
|
||||
const TEST_REMOTE_ID_1: u32 = 2;
|
||||
const TEST_REMOTE_ID_2: u32 = 3;
|
||||
const TEST_REMOTE_ID_3: u32 = 4;
|
||||
|
||||
let router = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut router_deployment = router.deployment_client();
|
||||
let mut router_mgmt = router.management_client();
|
||||
router_mgmt
|
||||
.update_server_id(TEST_ROUTER_ID)
|
||||
router_deployment
|
||||
.update_server_id(test_router_id)
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
router.wait_server_initialized().await;
|
||||
|
||||
let target_1 = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut target_1_mgmt = target_1.management_client();
|
||||
target_1_mgmt
|
||||
.update_server_id(TEST_TARGET_ID_1)
|
||||
let mut target_1_deployment = target_1.deployment_client();
|
||||
target_1_deployment
|
||||
.update_server_id(test_target_id_1)
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
target_1.wait_server_initialized().await;
|
||||
|
@ -503,9 +507,9 @@ async fn test_write_routed_no_shard() {
|
|||
.expect("set remote failed");
|
||||
|
||||
let target_2 = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut target_2_mgmt = target_2.management_client();
|
||||
target_2_mgmt
|
||||
.update_server_id(TEST_TARGET_ID_2)
|
||||
let mut target_2_deployment = target_2.deployment_client();
|
||||
target_2_deployment
|
||||
.update_server_id(test_target_id_2)
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
target_2.wait_server_initialized().await;
|
||||
|
@ -516,9 +520,9 @@ async fn test_write_routed_no_shard() {
|
|||
.expect("set remote failed");
|
||||
|
||||
let target_3 = ServerFixture::create_single_use(ServerType::Database).await;
|
||||
let mut target_3_mgmt = target_3.management_client();
|
||||
target_3_mgmt
|
||||
.update_server_id(TEST_TARGET_ID_3)
|
||||
let mut target_3_deployment = target_3.deployment_client();
|
||||
target_3_deployment
|
||||
.update_server_id(test_target_id_3)
|
||||
.await
|
||||
.expect("set ID failed");
|
||||
target_3.wait_server_initialized().await;
|
||||
|
|
|
@ -15,7 +15,7 @@ use influxdb_iox_client::{
|
|||
management::{generated_types::WriteBufferCreationConfig, CreateDatabaseError},
|
||||
write::WriteError,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use std::{num::NonZeroU32, sync::Arc};
|
||||
use tempfile::TempDir;
|
||||
use test_helpers::assert_contains;
|
||||
use time::SystemProvider;
|
||||
|
@ -312,8 +312,8 @@ pub async fn test_cross_write_buffer_tracing() {
|
|||
// create producer server
|
||||
let server_write = ServerFixture::create_single_use_with_config(test_config.clone()).await;
|
||||
server_write
|
||||
.management_client()
|
||||
.update_server_id(1)
|
||||
.deployment_client()
|
||||
.update_server_id(NonZeroU32::new(1).unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
server_write.wait_server_initialized().await;
|
||||
|
@ -335,8 +335,8 @@ pub async fn test_cross_write_buffer_tracing() {
|
|||
// create consumer DB
|
||||
let server_read = ServerFixture::create_single_use_with_config(test_config).await;
|
||||
server_read
|
||||
.management_client()
|
||||
.update_server_id(2)
|
||||
.deployment_client()
|
||||
.update_server_id(NonZeroU32::new(2).unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
server_read.wait_server_initialized().await;
|
||||
|
|
|
@ -4,7 +4,7 @@ use crate::{
|
|||
google::{longrunning::IoxOperation, FieldViolation},
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use std::{convert::TryInto, num::NonZeroU32};
|
||||
use std::convert::TryInto;
|
||||
use thiserror::Error;
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -463,35 +463,6 @@ impl Client {
|
|||
}
|
||||
}
|
||||
|
||||
/// Set the server's ID.
|
||||
pub async fn update_server_id(&mut self, id: u32) -> Result<(), UpdateServerIdError> {
|
||||
self.inner
|
||||
.update_server_id(UpdateServerIdRequest { id })
|
||||
.await
|
||||
.map_err(UpdateServerIdError::ServerError)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get the server's ID.
|
||||
pub async fn get_server_id(&mut self) -> Result<NonZeroU32, GetServerIdError> {
|
||||
let response = self
|
||||
.inner
|
||||
.get_server_id(GetServerIdRequest {})
|
||||
.await
|
||||
.map_err(|status| match status.code() {
|
||||
tonic::Code::NotFound => GetServerIdError::NoServerId,
|
||||
_ => GetServerIdError::ServerError(status),
|
||||
})?;
|
||||
|
||||
let id = response
|
||||
.get_ref()
|
||||
.id
|
||||
.try_into()
|
||||
.map_err(|_| GetServerIdError::NoServerId)?;
|
||||
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
/// Check if databases are loaded and ready for read and write.
|
||||
pub async fn get_server_status(&mut self) -> Result<ServerStatus, GetServerStatusError> {
|
||||
let response = self
|
||||
|
@ -507,18 +478,6 @@ impl Client {
|
|||
Ok(server_status)
|
||||
}
|
||||
|
||||
/// Set serving readiness.
|
||||
pub async fn set_serving_readiness(
|
||||
&mut self,
|
||||
ready: bool,
|
||||
) -> Result<(), SetServingReadinessError> {
|
||||
self.inner
|
||||
.set_serving_readiness(SetServingReadinessRequest { ready })
|
||||
.await
|
||||
.map_err(SetServingReadinessError::ServerError)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Creates a new IOx database.
|
||||
pub async fn create_database(
|
||||
&mut self,
|
||||
|
|
Loading…
Reference in New Issue