Merge branch 'main' into ntran/grpc_compact_os_chunks

pull/24376/head
Nga Tran 2021-12-03 18:16:42 -05:00 committed by GitHub
commit 65660dace5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 350 additions and 89 deletions

View File

@ -178,7 +178,7 @@ jobs:
- run:
name: Run Kafka
# Sudo needed because data directory owned by root but container runs as unprivileged user
command: sudo rpk redpanda start
command: sudo /opt/redpanda/bin/redpanda --redpanda-cfg /etc/redpanda/redpanda.yaml --overprovisioned --smp 1 --memory 1G --reserve-memory 0M
background: true
- checkout
- rust_components

View File

@ -255,10 +255,10 @@ influxdb_iox database create company_sensors
```
Data can be stored in InfluxDB IOx by sending it in [line protocol] format to the `/api/v2/write` endpoint or using the CLI.
For example, here is a command that will send the data in the `tests/fixtures/lineproto/metrics.lp` file in this repository, assuming that you're running the server on the default port into the `company_sensors` database, you can use:
For example, here is a command that will send the data in the `test_fixtures/lineproto/metrics.lp` file in this repository, assuming that you're running the server on the default port into the `company_sensors` database, you can use:
```shell
influxdb_iox database write company_sensors tests/fixtures/lineproto/metrics.lp
influxdb_iox database write company_sensors test_fixtures/lineproto/metrics.lp
```
To query data stored in the `company_sensors` database:
@ -286,7 +286,7 @@ IOx maps `organization` and `bucket` pairs to databases named with the two parts
Here's an example using [`curl`] to send data into the `company_sensors` database using the InfluxDB 2.0 `/api/v2/write` API:
```shell
curl -v "http://127.0.0.1:8080/api/v2/write?org=company&bucket=sensors" --data-binary @tests/fixtures/lineproto/metrics.lp
curl -v "http://127.0.0.1:8080/api/v2/write?org=company&bucket=sensors" --data-binary @test_fixtures/lineproto/metrics.lp
```
[line protocol]: https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/

View File

@ -160,6 +160,15 @@ message ReleaseDatabaseResponse {
message ClaimDatabaseRequest {
bytes uuid = 1;
// Force this server to claim this database, even if it is
// ostensibly owned by another server.
//
// WARNING: ONLY do this if no other servers are writing to this
// database (for example, the data files have been copied
// somewhere). If another server is currently writing to this
// database, corruption will very likely occur
bool force = 2;
}
message ClaimDatabaseResponse {

View File

@ -179,6 +179,16 @@ struct Release {
struct Claim {
/// The UUID of the database to claim
uuid: Uuid,
/// Force this server to claim this database, even if it is
/// ostensibly owned by another server.
///
/// WARNING: ONLY do this if you are sure that no other servers
/// are writing to this database (for example, the data files have
/// been copied somewhere). If another server is currently writing
/// to this database, corruption will very likely occur
#[structopt(long)]
force: bool,
}
/// All possible subcommands for database
@ -337,7 +347,7 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
}
Command::Claim(command) => {
let mut client = management::Client::new(connection);
let db_name = client.claim_database(command.uuid).await?;
let db_name = client.claim_database(command.uuid, command.force).await?;
println!("Claimed database {}", db_name);
}
}

View File

@ -144,13 +144,13 @@ impl management_service_server::ManagementService for ManagementService {
&self,
request: Request<ClaimDatabaseRequest>,
) -> Result<Response<ClaimDatabaseResponse>, Status> {
let ClaimDatabaseRequest { uuid } = request.into_inner();
let ClaimDatabaseRequest { uuid, force } = request.into_inner();
let uuid = Uuid::from_slice(&uuid).scope("uuid")?;
let db_name = self
.server
.claim_database(uuid)
.claim_database(uuid, force)
.await
.map_err(default_server_error_handler)?;

View File

@ -327,11 +327,14 @@ async fn test_create_get_update_release_claim_database() {
format!("Resource database/{} not found", db_name)
);
client.claim_database(released_uuid).await.unwrap();
client.claim_database(released_uuid, false).await.unwrap();
client.get_database(&db_name, false).await.unwrap();
let err = client.claim_database(released_uuid).await.unwrap_err();
let err = client
.claim_database(released_uuid, false)
.await
.unwrap_err();
assert_contains!(
err.to_string(),
format!("Resource database_uuid/{} already exists", released_uuid)
@ -347,7 +350,10 @@ async fn test_create_get_update_release_claim_database() {
}
let unknown_uuid = Uuid::new_v4();
let err = client.claim_database(unknown_uuid).await.unwrap_err();
let err = client
.claim_database(unknown_uuid, false)
.await
.unwrap_err();
assert_contains!(
err.to_string(),
format!("Resource database_uuid/{} not found", unknown_uuid)
@ -362,7 +368,10 @@ async fn test_create_get_update_release_claim_database() {
assert_ne!(released_uuid, newly_created_uuid);
let err = client.claim_database(released_uuid).await.unwrap_err();
let err = client
.claim_database(released_uuid, false)
.await
.unwrap_err();
assert_contains!(
err.to_string(),
format!("Resource database/{} already exists", db_name)
@ -449,7 +458,7 @@ async fn claim_database() {
let deleted_uuid = client.release_database(&db_name, None).await.unwrap();
assert_eq!(created_uuid, deleted_uuid);
client.claim_database(deleted_uuid).await.unwrap();
client.claim_database(deleted_uuid, false).await.unwrap();
// Claimed database is back in this server's database list
assert_eq!(
@ -466,7 +475,10 @@ async fn claim_database() {
);
// Claiming the same database again is an error
let err = client.claim_database(deleted_uuid).await.unwrap_err();
let err = client
.claim_database(deleted_uuid, false)
.await
.unwrap_err();
assert_contains!(
err.to_string(),
format!("Resource database_uuid/{} already exists", deleted_uuid)

View File

@ -13,7 +13,7 @@ use generated_types::{
influxdata::iox::management::v1::{operation_metadata::Job, WipePreservedCatalog},
};
use predicates::prelude::*;
use std::{sync::Arc, time::Duration};
use std::{path::PathBuf, sync::Arc, time::Duration};
use tempfile::TempDir;
use test_helpers::make_temp_file;
use uuid::Uuid;
@ -669,6 +669,209 @@ async fn claim_database() {
)));
}
#[tokio::test]
async fn force_claim_database() {
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
let addr = server_fixture.grpc_base();
let db_name = rand_name();
let db = &db_name;
// Create a database on the server
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("create")
.arg(db)
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("Created"));
// Release database returns the UUID
let stdout = String::from_utf8(
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("release")
.arg(db)
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains(format!(
"Released database {}",
db
)))
.get_output()
.stdout
.clone(),
)
.unwrap();
let db_uuid = stdout.lines().last().unwrap().trim();
// delete the owner file <dir>/dbs/<uuid>/owner.pb
let mut owner_file: PathBuf = server_fixture.dir().into();
owner_file.push("dbs");
owner_file.push(db_uuid);
owner_file.push("owner.pb");
println!("Deleting {:?}", owner_file);
Command::new("rm")
.arg(owner_file.to_string_lossy().to_string())
.assert()
.success();
// Claiming db will now not work (no owner file)
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("claim")
.arg(db_uuid)
.arg("--host")
.arg(addr)
.assert()
.failure()
.stderr(predicate::str::contains("owner.pb not found"));
// But does work when --force is supplied
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("claim")
.arg(db_uuid)
.arg("--host")
.arg(addr)
.arg("--force")
.assert()
.success()
.stdout(predicate::str::contains(format!(
"Claimed database {}",
db_name
)));
}
#[tokio::test]
async fn migrate_database_files_from_one_server_to_another() {
let server_fixture = ServerFixture::create_single_use(ServerType::Database).await;
let addr = server_fixture.grpc_base();
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("server")
.arg("set")
.arg("3113")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("Ok"));
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("server")
.arg("wait-server-initialized")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("Server initialized."));
let db_name = rand_name();
let db = &db_name;
// Create a database on one server
let stdout = String::from_utf8(
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("create")
.arg(db)
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("Created"))
.get_output()
.stdout
.clone(),
)
.unwrap();
let db_uuid = stdout.lines().last().unwrap().trim();
// figure out where the database lives and copy its data to a temporary directory,
// as you might copy data from remote object storage to local disk for debugging.
// Assume data layout is <dir>/dbs/<uuid>
let mut source_dir: PathBuf = server_fixture.dir().into();
source_dir.push("dbs");
source_dir.push(db_uuid);
let tmp_dir = TempDir::new().expect("making tmp dir");
let target_dir = tmp_dir.path();
println!("Copying data from {:?} to {:?}", source_dir, target_dir);
Command::new("cp")
.arg("-R")
.arg(source_dir.to_string_lossy().to_string())
.arg(target_dir.to_string_lossy().to_string())
.assert()
.success();
// stop the first server (note this call blocks until the process stops)
std::mem::drop(server_fixture);
// Now start another server that can claim the database
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
let addr = server_fixture.grpc_base();
// copy the data from tmp_dir/<uuid> to the new server's location
let mut source_dir: PathBuf = tmp_dir.path().into();
source_dir.push(db_uuid);
let mut target_dir: PathBuf = server_fixture.dir().into();
target_dir.push("dbs");
println!("Copying data from {:?} to {:?}", source_dir, target_dir);
Command::new("cp")
.arg("-R")
.arg(source_dir.to_string_lossy().to_string())
.arg(target_dir.to_string_lossy().to_string())
.assert()
.success();
// Claiming without --force doesn't work as owner.pb still record the other server owning it
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("claim")
.arg(db_uuid)
.arg("--host")
.arg(addr)
.assert()
.failure()
.stderr(predicate::str::contains(
"is already owned by the server with ID 3113",
));
// however with --force the owner.pb file is updated forcibly
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("claim")
.arg(db_uuid)
.arg("--host")
.arg(addr)
.arg("--force") // sudo make me a sandwich
.assert()
.success()
.stdout(predicate::str::contains(format!(
"Claimed database {}",
db_name
)));
}
#[tokio::test]
async fn test_get_chunks() {
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;

View File

@ -194,12 +194,21 @@ impl Client {
}
/// Claim database
pub async fn claim_database(&mut self, uuid: Uuid) -> Result<String, Error> {
///
/// if `force` is true, forces the server to claim this database, even if it is
/// ostensibly owned by another server.
///
/// WARNING: If another server is currently writing to this
/// database, corruption will very likely occur.
pub async fn claim_database(&mut self, uuid: Uuid, force: bool) -> Result<String, Error> {
let uuid_bytes = uuid.as_bytes().to_vec();
let response = self
.inner
.claim_database(ClaimDatabaseRequest { uuid: uuid_bytes })
.claim_database(ClaimDatabaseRequest {
uuid: uuid_bytes,
force,
})
.await?;
Ok(response.into_inner().db_name)

View File

@ -25,9 +25,8 @@ Examples:
iox_data_generator -s spec.toml -o lp
# Generate data points and write to the server running at localhost:8080 with the provided org,
# bucket and authorization token, creating the bucket
iox_data_generator -s spec.toml -h localhost:8080 --org myorg --org_id 0000111100001111 \
--bucket mybucket --token mytoken --create
# bucket and authorization token
iox_data_generator -s spec.toml -h localhost:8080 --org myorg --bucket mybucket --token mytoken
# Generate data points for the 24 hours between midnight 2020-01-01 and 2020-01-02
iox_data_generator -s spec.toml -o lp --start 2020-01-01 --end 2020-01-02
@ -86,12 +85,6 @@ Logging:
.help("The organization name to write to")
.takes_value(true),
)
.arg(
Arg::with_name("ORG_ID")
.long("org_id")
.help("The 16-digit hex ID of the organization. Only needed if passing `--create`.")
.takes_value(true),
)
.arg(
Arg::with_name("BUCKET")
.long("bucket")
@ -124,11 +117,6 @@ Logging:
)
.takes_value(true),
)
.arg(
Arg::with_name("create")
.long("create")
.help("Create the bucket specified before sending points. Requires `--org_id`"),
)
.arg(Arg::with_name("continue").long("continue").help(
"Generate live data using the intervals from the spec after generating historical \
data. This option has no effect if you specify an end time.",
@ -188,13 +176,11 @@ Logging:
{
PointsWriterBuilder::new_file(line_protocol_filename)?
} else if let Some(host) = matches.value_of("HOST") {
let (host, org, bucket, token, create_bucket, org_id) = validate_api_arguments(
let (host, org, bucket, token) = validate_api_arguments(
host,
matches.value_of("ORG"),
matches.value_of("BUCKET"),
matches.value_of("TOKEN"),
matches.is_present("create"),
matches.value_of("ORG_ID"),
);
PointsWriterBuilder::new_api(
@ -202,8 +188,6 @@ Logging:
org,
bucket,
token,
create_bucket,
org_id,
matches.value_of("jaeger_debug_header"),
)
.await?
@ -251,15 +235,9 @@ fn validate_api_arguments<'a>(
org: Option<&'a str>,
bucket: Option<&'a str>,
token: Option<&'a str>,
create_bucket: bool,
org_id: Option<&'a str>,
) -> (&'a str, &'a str, &'a str, &'a str, bool, Option<&'a str>) {
) -> (&'a str, &'a str, &'a str, &'a str) {
let mut errors = vec![];
if create_bucket && org_id.is_none() {
panic!("When `--create` is specified, `--org_id` is required, but it was missing.");
}
if org.is_none() {
errors.push("`--org` is missing");
}
@ -272,14 +250,7 @@ fn validate_api_arguments<'a>(
if errors.is_empty() {
// These `unwrap`s are safe because otherwise errors wouldn't be empty
(
host,
org.unwrap(),
bucket.unwrap(),
token.unwrap(),
create_bucket,
org_id,
)
(host, org.unwrap(), bucket.unwrap(), token.unwrap())
} else {
panic!(
"When `--host` is specified, `--org`, `--bucket`, and `--token` are required, \

View File

@ -2,8 +2,8 @@
use crate::measurement::LineToGenerate;
use futures::stream;
use influxdb2_client::models::{PostBucketRequest, WriteDataPoint};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use influxdb2_client::models::WriteDataPoint;
use snafu::{ensure, ResultExt, Snafu};
#[cfg(test)]
use std::{
collections::BTreeMap,
@ -116,8 +116,6 @@ impl PointsWriterBuilder {
org: impl Into<String> + Send,
bucket: impl Into<String> + Send,
token: impl Into<String> + Send,
create_bucket: bool,
org_id: Option<&str>,
jaeger_debug: Option<&str>,
) -> Result<Self> {
let host = host.into();
@ -138,20 +136,6 @@ impl PointsWriterBuilder {
let org = org.into();
let bucket = bucket.into();
if create_bucket {
let org_id = org_id.context(OrgIdRequiredToCreateBucket)?.to_string();
let bucket = PostBucketRequest {
org_id,
name: bucket.clone(),
..Default::default()
};
client
.create_bucket(Some(bucket))
.await
.context(CantCreateBucket)?;
}
Ok(Self {
config: PointsWriterConfig::Api {
client,

View File

@ -252,15 +252,20 @@ impl Database {
Ok(uuid)
}
/// Create an claimed database without any state. Returns its location in object storage
/// for saving in the server config file.
/// Create an claimed database without any state. Returns its
/// location in object storage for saving in the server config
/// file.
///
/// if `force` is true, a missing owner info or owner info that is
/// for the wrong server id are ignored (do not cause errors)
pub async fn claim(
application: Arc<ApplicationState>,
db_name: &DatabaseName<'static>,
uuid: Uuid,
server_id: ServerId,
force: bool,
) -> Result<String, InitError> {
info!(%db_name, %uuid, "claiming database");
info!(%db_name, %uuid, %force, "claiming database");
let iox_object_store = IoxObjectStore::load(Arc::clone(application.object_store()), uuid)
.await
@ -268,15 +273,42 @@ impl Database {
let owner_info = fetch_owner_info(&iox_object_store)
.await
.context(FetchingOwnerInfo)?;
.context(FetchingOwnerInfo);
ensure!(
owner_info.id == 0,
CantClaimDatabaseCurrentlyOwned {
uuid,
server_id: owner_info.id
// try to recreate owner_info if force is specified
let owner_info = match owner_info {
Err(_) if force => {
warn!("Attempting to recreate missing owner info due to force");
let server_location =
IoxObjectStore::server_config_path(application.object_store(), server_id)
.to_string();
create_owner_info(server_id, server_location, &iox_object_store)
.await
.context(CreatingOwnerInfo)?;
fetch_owner_info(&iox_object_store)
.await
.context(FetchingOwnerInfo)
}
t => t,
}?;
if owner_info.id != 0 {
if !force {
return CantClaimDatabaseCurrentlyOwned {
uuid,
server_id: owner_info.id,
}
.fail();
} else {
warn!(
owner_id = owner_info.id,
"Ignoring owner info mismatch due to force"
);
}
}
let database_location = iox_object_store.root_path();
let server_location =
@ -1608,7 +1640,7 @@ mod tests {
.to_string();
let uuid = database.release().await.unwrap();
Database::claim(application, db_name, uuid, new_server_id)
Database::claim(application, db_name, uuid, new_server_id, false)
.await
.unwrap();

View File

@ -699,8 +699,8 @@ impl Server {
/// * No database with this UUID can be found
/// * There's already an active database with this name
/// * This database is already owned by this server
/// * This database is already owned by a different server
pub async fn claim_database(&self, uuid: Uuid) -> Result<DatabaseName<'static>> {
/// * This database is already owned by a different server (unless force is true)
pub async fn claim_database(&self, uuid: Uuid, force: bool) -> Result<DatabaseName<'static>> {
// Wait for exclusive access to mutate server state
let handle_fut = self.shared.state.read().freeze();
let handle = handle_fut.await;
@ -742,6 +742,7 @@ impl Server {
&db_name,
uuid,
server_id,
force,
)
.await
.context(CannotClaimDatabase)?;
@ -2079,7 +2080,7 @@ mod tests {
let released_uuid = server.release_database(&foo_db_name, None).await.unwrap();
// claim database by UUID
server.claim_database(released_uuid).await.unwrap();
server.claim_database(released_uuid, false).await.unwrap();
let claimed = server.database(&foo_db_name).unwrap();
claimed.wait_for_init().await.unwrap();
@ -2104,13 +2105,13 @@ mod tests {
server.wait_for_init().await.unwrap();
assert_error!(
server.claim_database(invalid_uuid).await,
server.claim_database(invalid_uuid, false).await,
Error::DatabaseUuidNotFound { .. },
);
}
#[tokio::test]
async fn cant_claim_database_owned_by_another_server() {
/// create servers (1 and 2) with a database on server 1
async fn make_2_servers() -> (Arc<Server>, Arc<Server>, DatabaseName<'static>, Uuid) {
let application = make_application();
let server_id1 = ServerId::try_from(1).unwrap();
let server_id2 = ServerId::try_from(2).unwrap();
@ -2132,19 +2133,49 @@ mod tests {
server2.set_id(server_id2).unwrap();
server2.wait_for_init().await.unwrap();
(server1, server2, foo_db_name, uuid)
}
#[tokio::test]
async fn cant_claim_database_owned_by_another_server() {
let (server1, server2, db_name, db_uuid) = make_2_servers().await;
// Attempting to claim on server 2 will fail
assert_error!(
server2.claim_database(uuid).await,
server2.claim_database(db_uuid, false).await,
Error::CannotClaimDatabase {
source: database::InitError::CantClaimDatabaseCurrentlyOwned { server_id, .. }
} if server_id == server_id1.get_u32()
} if server_id == server1.server_id().unwrap().get_u32()
);
// Have to release from server 1 first
server1.release_database(&foo_db_name, None).await.unwrap();
server1.release_database(&db_name, None).await.unwrap();
// Then claiming on server 2 will work
server2.claim_database(uuid).await.unwrap();
server2.claim_database(db_uuid, false).await.unwrap();
}
#[tokio::test]
async fn can_force_claim_database_owned_by_another_server() {
let (server1, server2, _db_name, db_uuid) = make_2_servers().await;
// shutdown server 1
server1.shutdown();
server1
.join()
.await
.expect("Server successfully terminated");
// Attempting to claim on server 2 will fail
assert_error!(
server2.claim_database(db_uuid, false).await,
Error::CannotClaimDatabase {
source: database::InitError::CantClaimDatabaseCurrentlyOwned { server_id, .. }
} if server_id == server1.server_id().unwrap().get_u32()
);
// Then claiming on server 2 with `force=true` will work
server2.claim_database(db_uuid, true).await.unwrap();
}
#[tokio::test]