fix: Prevent Catalog UUID races for new nodes (#26160)
When starting up a new cluster in Enterprise we might have multiple nodes starting at the same time. We might have an issue wherby we have multiple catalogs with different UUIDs in their in memory representation. For example: - Let's say we have node0 and node1 - node0 and node1 start at the same time and both check object storage to see if there is a catalog to load - They both see there is no catalog - They both create a new one by generating a UUID and persisting it to object storage - Whichever is written second is now the one with the correct UUID in their in memory representation while the other will not have the correct one until restarted likely This in practice isn't an issue today as Trevor notes in https://github.com/influxdata/influxdb_pro/issues/600, but it could be once we start using `--cluster-id` for licensing purposes. In order to prevent this we instead make the write to object storage use the Put mode. If it exists then the write will fail and the node that lost the race will instead just load the other's catalog. For example if node1 wins the race then node0 will load the catalog created by node1 and use that UUID instead. As this is hard to create a test for as it involves a race condition to happen I have not included one as we could never really be sure it was taken care of and we rely on the underlying object store we are writing to to handle this for us. It's also not likely to happen given this is only on a new cluster being initiated for the first time decreasing the chances of it occurring in the first place.chore/beta-build
parent
d2da058ded
commit
4e2cb630b3
|
@ -65,9 +65,19 @@ impl ObjectStoreCatalog {
|
|||
let catalog_uuid = Uuid::new_v4();
|
||||
info!(catalog_uuid = ?catalog_uuid, "catalog not found, creating a new one");
|
||||
let new_catalog = InnerCatalog::new(Arc::clone(&self.prefix), catalog_uuid);
|
||||
self.persist_catalog_checkpoint(&new_catalog.snapshot())
|
||||
.await?;
|
||||
Ok(new_catalog)
|
||||
match self
|
||||
.persist_catalog_checkpoint(&new_catalog.snapshot())
|
||||
.await?
|
||||
{
|
||||
PersistCatalogResult::Success => Ok(new_catalog),
|
||||
PersistCatalogResult::AlreadyExists => {
|
||||
self.load_catalog().await.map(|catalog| {
|
||||
catalog.expect(
|
||||
"the catalog should have already been persisted for us to load",
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -206,11 +216,13 @@ impl ObjectStoreCatalog {
|
|||
.await
|
||||
}
|
||||
|
||||
/// Persist the `CatalogSnapshot` as a checkpoint and ensure that the operation succeeds
|
||||
/// Persist the `CatalogSnapshot` as a checkpoint and ensure that the operation succeeds unless
|
||||
/// the file already exists. Object Storage should handle the concurrent requests in order and
|
||||
/// make sure that only one of them gets the win in terms of who writes first
|
||||
pub(crate) async fn persist_catalog_checkpoint(
|
||||
&self,
|
||||
snapshot: &CatalogSnapshot,
|
||||
) -> Result<()> {
|
||||
) -> Result<PersistCatalogResult> {
|
||||
let sequence = snapshot.sequence_number().get();
|
||||
let catalog_path = CatalogFilePath::checkpoint(&self.prefix);
|
||||
|
||||
|
@ -219,13 +231,26 @@ impl ObjectStoreCatalog {
|
|||
|
||||
// NOTE: not sure if this should be done in a loop, i.e., what error variants from
|
||||
// the object store would warrant a retry.
|
||||
match self.store.put(&catalog_path, content.clone().into()).await {
|
||||
match self
|
||||
.store
|
||||
.put_opts(
|
||||
&catalog_path,
|
||||
content.clone().into(),
|
||||
PutOptions {
|
||||
mode: object_store::PutMode::Create,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(put_result) => {
|
||||
info!(sequence, "persisted catalog checkpoint file");
|
||||
debug!(put_result = ?put_result, "object store PUT result");
|
||||
Ok(())
|
||||
Ok(PersistCatalogResult::Success)
|
||||
}
|
||||
Err(object_store::Error::AlreadyExists { .. }) => {
|
||||
Ok(PersistCatalogResult::AlreadyExists)
|
||||
}
|
||||
Err(object_store::Error::NotModified { .. }) => Ok(()),
|
||||
Err(err) => {
|
||||
error!(error = ?err, "failed to persist catalog checkpoint file");
|
||||
Err(err.into())
|
||||
|
|
Loading…
Reference in New Issue