fix: Don't hardcode the transition shard id
parent
97d90f5615
commit
473ce7a268
|
@ -5,7 +5,7 @@ pub(crate) mod name_resolver;
|
|||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{NamespaceId, TableId};
|
||||
use data_types::{NamespaceId, ShardId, TableId};
|
||||
use dml::DmlOperation;
|
||||
use metric::U64Counter;
|
||||
use observability_deps::tracing::warn;
|
||||
|
@ -77,6 +77,8 @@ pub(crate) struct NamespaceData<O> {
|
|||
partition_provider: Arc<dyn PartitionProvider>,
|
||||
|
||||
post_write_observer: Arc<O>,
|
||||
|
||||
transition_shard_id: ShardId,
|
||||
}
|
||||
|
||||
impl<O> NamespaceData<O> {
|
||||
|
@ -88,6 +90,7 @@ impl<O> NamespaceData<O> {
|
|||
partition_provider: Arc<dyn PartitionProvider>,
|
||||
post_write_observer: Arc<O>,
|
||||
metrics: &metric::Registry,
|
||||
transition_shard_id: ShardId,
|
||||
) -> Self {
|
||||
let table_count = metrics
|
||||
.register_metric::<U64Counter>(
|
||||
|
@ -104,6 +107,7 @@ impl<O> NamespaceData<O> {
|
|||
table_count,
|
||||
partition_provider,
|
||||
post_write_observer,
|
||||
transition_shard_id,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -163,6 +167,7 @@ where
|
|||
Arc::clone(&self.namespace_name),
|
||||
Arc::clone(&self.partition_provider),
|
||||
Arc::clone(&self.post_write_observer),
|
||||
self.transition_shard_id,
|
||||
))
|
||||
});
|
||||
|
||||
|
@ -226,7 +231,7 @@ where
|
|||
mod tests {
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use data_types::{PartitionId, PartitionKey, ShardIndex};
|
||||
use data_types::{PartitionId, PartitionKey, ShardId, ShardIndex};
|
||||
use metric::{Attributes, Metric};
|
||||
|
||||
use super::*;
|
||||
|
@ -246,6 +251,7 @@ mod tests {
|
|||
const TABLE_ID: TableId = TableId::new(44);
|
||||
const NAMESPACE_NAME: &str = "platanos";
|
||||
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
|
||||
const TRANSITION_SHARD_ID: ShardId = ShardId::new(84);
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_init_table() {
|
||||
|
@ -266,6 +272,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
),
|
||||
));
|
||||
|
||||
|
@ -276,6 +283,7 @@ mod tests {
|
|||
partition_provider,
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
&metrics,
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
// Assert the namespace name was stored
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
use std::{collections::VecDeque, sync::Arc};
|
||||
|
||||
use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, TableId};
|
||||
use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId};
|
||||
use mutable_batch::MutableBatch;
|
||||
use observability_deps::tracing::*;
|
||||
use schema::sort::SortKey;
|
||||
|
@ -89,6 +89,8 @@ pub(crate) struct PartitionData {
|
|||
/// The number of persist operations completed over the lifetime of this
|
||||
/// [`PartitionData`].
|
||||
completed_persistence_count: u64,
|
||||
|
||||
transition_shard_id: ShardId,
|
||||
}
|
||||
|
||||
impl PartitionData {
|
||||
|
@ -102,6 +104,7 @@ impl PartitionData {
|
|||
table_id: TableId,
|
||||
table_name: Arc<DeferredLoad<TableName>>,
|
||||
sort_key: SortKeyState,
|
||||
transition_shard_id: ShardId,
|
||||
) -> Self {
|
||||
Self {
|
||||
partition_id: id,
|
||||
|
@ -115,6 +118,7 @@ impl PartitionData {
|
|||
persisting: VecDeque::with_capacity(1),
|
||||
started_persistence_count: BatchIdent::default(),
|
||||
completed_persistence_count: 0,
|
||||
transition_shard_id,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -293,6 +297,11 @@ impl PartitionData {
|
|||
&self.partition_key
|
||||
}
|
||||
|
||||
/// Return the transition_shard_id for this partition.
|
||||
pub(crate) fn transition_shard_id(&self) -> ShardId {
|
||||
self.transition_shard_id
|
||||
}
|
||||
|
||||
/// Return the [`NamespaceId`] this partition is a part of.
|
||||
pub(crate) fn namespace_id(&self) -> NamespaceId {
|
||||
self.namespace_id
|
||||
|
@ -344,6 +353,7 @@ mod tests {
|
|||
use crate::{buffer_tree::partition::resolver::SortKeyResolver, test_util::populate_catalog};
|
||||
|
||||
const PARTITION_ID: PartitionId = PartitionId::new(1);
|
||||
const TRANSITION_SHARD_ID: ShardId = ShardId::new(84);
|
||||
|
||||
lazy_static! {
|
||||
static ref PARTITION_KEY: PartitionKey = PartitionKey::from("platanos");
|
||||
|
@ -369,6 +379,7 @@ mod tests {
|
|||
TABLE_NAME.clone()
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
// And no data should be returned when queried.
|
||||
|
@ -449,6 +460,7 @@ mod tests {
|
|||
TABLE_NAME.clone()
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
assert!(p.get_query_data().is_none());
|
||||
|
@ -599,6 +611,7 @@ mod tests {
|
|||
TABLE_NAME.clone()
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
// Perform the initial write.
|
||||
|
@ -777,6 +790,7 @@ mod tests {
|
|||
TABLE_NAME.clone()
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
// Perform the initial write.
|
||||
|
@ -957,6 +971,7 @@ mod tests {
|
|||
TableName::from("platanos")
|
||||
})),
|
||||
starting_state,
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
let want = Some(SortKey::from_columns(["banana", "platanos", "time"]));
|
||||
|
@ -1016,6 +1031,7 @@ mod tests {
|
|||
TableName::from("platanos")
|
||||
})),
|
||||
starting_state,
|
||||
shard_id,
|
||||
);
|
||||
|
||||
let want = Some(SortKey::from_columns(["banana", "platanos", "time"]));
|
||||
|
@ -1040,6 +1056,7 @@ mod tests {
|
|||
TABLE_NAME.clone()
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
// Perform out of order writes.
|
||||
|
@ -1087,6 +1104,7 @@ mod tests {
|
|||
TABLE_NAME.clone()
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
assert!(p.mark_persisting().is_none());
|
||||
|
@ -1106,6 +1124,7 @@ mod tests {
|
|||
TABLE_NAME.clone()
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1;
|
||||
|
@ -1132,6 +1151,7 @@ mod tests {
|
|||
TABLE_NAME.clone()
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
assert!(p.get_query_data().is_none());
|
||||
|
|
|
@ -2,7 +2,9 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
|
|||
|
||||
use async_trait::async_trait;
|
||||
use backoff::BackoffConfig;
|
||||
use data_types::{NamespaceId, Partition, PartitionId, PartitionKey, SequenceNumber, TableId};
|
||||
use data_types::{
|
||||
NamespaceId, Partition, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId,
|
||||
};
|
||||
use iox_catalog::interface::Catalog;
|
||||
use observability_deps::tracing::debug;
|
||||
use parking_lot::Mutex;
|
||||
|
@ -164,6 +166,7 @@ where
|
|||
namespace_name: Arc<DeferredLoad<NamespaceName>>,
|
||||
table_id: TableId,
|
||||
table_name: Arc<DeferredLoad<TableName>>,
|
||||
transition_shard_id: ShardId,
|
||||
) -> PartitionData {
|
||||
// Use the cached PartitionKey instead of the caller's partition_key,
|
||||
// instead preferring to reuse the already-shared Arc<str> in the cache.
|
||||
|
@ -193,6 +196,7 @@ where
|
|||
table_id,
|
||||
table_name,
|
||||
SortKeyState::Deferred(Arc::new(sort_key_resolver)),
|
||||
transition_shard_id,
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -206,6 +210,7 @@ where
|
|||
namespace_name,
|
||||
table_id,
|
||||
table_name,
|
||||
transition_shard_id,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
@ -213,12 +218,11 @@ where
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use data_types::ShardId;
|
||||
use iox_catalog::mem::MemCatalog;
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
buffer_tree::partition::resolver::mock::MockPartitionProvider, TRANSITION_SHARD_ID,
|
||||
};
|
||||
use crate::buffer_tree::partition::resolver::mock::MockPartitionProvider;
|
||||
|
||||
const PARTITION_KEY: &str = "bananas";
|
||||
const PARTITION_ID: PartitionId = PartitionId::new(42);
|
||||
|
@ -226,6 +230,7 @@ mod tests {
|
|||
const NAMESPACE_NAME: &str = "ns-bananas";
|
||||
const TABLE_ID: TableId = TableId::new(3);
|
||||
const TABLE_NAME: &str = "platanos";
|
||||
const TRANSITION_SHARD_ID: ShardId = ShardId::new(84);
|
||||
|
||||
fn new_cache<P>(
|
||||
inner: MockPartitionProvider,
|
||||
|
@ -257,6 +262,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
let inner = MockPartitionProvider::default().with_partition(data);
|
||||
|
||||
|
@ -272,6 +278,7 @@ mod tests {
|
|||
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
|
||||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
TRANSITION_SHARD_ID,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
@ -310,6 +317,7 @@ mod tests {
|
|||
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
|
||||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
TRANSITION_SHARD_ID,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
@ -347,6 +355,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
));
|
||||
|
||||
let partition = Partition {
|
||||
|
@ -370,6 +379,7 @@ mod tests {
|
|||
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
|
||||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
TRANSITION_SHARD_ID,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
@ -393,6 +403,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
));
|
||||
|
||||
let partition = Partition {
|
||||
|
@ -416,6 +427,7 @@ mod tests {
|
|||
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
|
||||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
TRANSITION_SHARD_ID,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ use std::sync::Arc;
|
|||
|
||||
use async_trait::async_trait;
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
use data_types::{NamespaceId, Partition, PartitionKey, TableId};
|
||||
use data_types::{NamespaceId, Partition, PartitionKey, ShardId, TableId};
|
||||
use iox_catalog::interface::Catalog;
|
||||
use observability_deps::tracing::debug;
|
||||
|
||||
|
@ -17,7 +17,6 @@ use crate::{
|
|||
table::TableName,
|
||||
},
|
||||
deferred_load::DeferredLoad,
|
||||
TRANSITION_SHARD_ID,
|
||||
};
|
||||
|
||||
/// A [`PartitionProvider`] implementation that hits the [`Catalog`] to resolve
|
||||
|
@ -43,12 +42,13 @@ impl CatalogPartitionResolver {
|
|||
&self,
|
||||
partition_key: PartitionKey,
|
||||
table_id: TableId,
|
||||
transition_shard_id: ShardId,
|
||||
) -> Result<Partition, iox_catalog::interface::Error> {
|
||||
self.catalog
|
||||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.create_or_get(partition_key, TRANSITION_SHARD_ID, table_id)
|
||||
.create_or_get(partition_key, transition_shard_id, table_id)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
@ -62,16 +62,18 @@ impl PartitionProvider for CatalogPartitionResolver {
|
|||
namespace_name: Arc<DeferredLoad<NamespaceName>>,
|
||||
table_id: TableId,
|
||||
table_name: Arc<DeferredLoad<TableName>>,
|
||||
transition_shard_id: ShardId,
|
||||
) -> PartitionData {
|
||||
debug!(
|
||||
%partition_key,
|
||||
%table_id,
|
||||
%table_name,
|
||||
%transition_shard_id,
|
||||
"upserting partition in catalog"
|
||||
);
|
||||
let p = Backoff::new(&self.backoff_config)
|
||||
.retry_all_errors("resolve partition", || {
|
||||
self.get(partition_key.clone(), table_id)
|
||||
self.get(partition_key.clone(), table_id, transition_shard_id)
|
||||
})
|
||||
.await
|
||||
.expect("retry forever");
|
||||
|
@ -87,6 +89,7 @@ impl PartitionProvider for CatalogPartitionResolver {
|
|||
table_id,
|
||||
table_name,
|
||||
SortKeyState::Provided(p.sort_key()),
|
||||
transition_shard_id,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -99,7 +102,6 @@ mod tests {
|
|||
use data_types::ShardIndex;
|
||||
|
||||
use super::*;
|
||||
use crate::TRANSITION_SHARD_ID;
|
||||
|
||||
const TABLE_NAME: &str = "bananas";
|
||||
const NAMESPACE_NAME: &str = "ns-bananas";
|
||||
|
@ -111,7 +113,7 @@ mod tests {
|
|||
let catalog: Arc<dyn Catalog> =
|
||||
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
|
||||
|
||||
let (_shard_id, namespace_id, table_id) = {
|
||||
let (shard_id, namespace_id, table_id) = {
|
||||
let mut repos = catalog.repositories().await;
|
||||
let t = repos.topics().create_or_get("platanos").await.unwrap();
|
||||
let q = repos.query_pools().create_or_get("platanos").await.unwrap();
|
||||
|
@ -126,7 +128,6 @@ mod tests {
|
|||
.create_or_get(&t, ShardIndex::new(0))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(shard.id, TRANSITION_SHARD_ID);
|
||||
|
||||
let table = repos
|
||||
.tables()
|
||||
|
@ -151,6 +152,7 @@ mod tests {
|
|||
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
|
||||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
shard_id,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{NamespaceId, PartitionKey, TableId};
|
||||
use data_types::{NamespaceId, PartitionKey, ShardId, TableId};
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use super::r#trait::PartitionProvider;
|
||||
|
@ -54,6 +54,7 @@ impl PartitionProvider for MockPartitionProvider {
|
|||
namespace_name: Arc<DeferredLoad<NamespaceName>>,
|
||||
table_id: TableId,
|
||||
table_name: Arc<DeferredLoad<TableName>>,
|
||||
_transition_shard_id: ShardId,
|
||||
) -> PartitionData {
|
||||
let p = self
|
||||
.partitions
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{NamespaceId, PartitionKey, TableId};
|
||||
use data_types::{NamespaceId, PartitionKey, ShardId, TableId};
|
||||
|
||||
use crate::{
|
||||
buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableName},
|
||||
|
@ -24,6 +24,7 @@ pub(crate) trait PartitionProvider: Send + Sync + Debug {
|
|||
namespace_name: Arc<DeferredLoad<NamespaceName>>,
|
||||
table_id: TableId,
|
||||
table_name: Arc<DeferredLoad<TableName>>,
|
||||
transition_shard_id: ShardId,
|
||||
) -> PartitionData;
|
||||
}
|
||||
|
||||
|
@ -39,6 +40,7 @@ where
|
|||
namespace_name: Arc<DeferredLoad<NamespaceName>>,
|
||||
table_id: TableId,
|
||||
table_name: Arc<DeferredLoad<TableName>>,
|
||||
transition_shard_id: ShardId,
|
||||
) -> PartitionData {
|
||||
(**self)
|
||||
.get_partition(
|
||||
|
@ -47,6 +49,7 @@ where
|
|||
namespace_name,
|
||||
table_id,
|
||||
table_name,
|
||||
transition_shard_id,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
@ -56,11 +59,13 @@ where
|
|||
mod tests {
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use data_types::PartitionId;
|
||||
use data_types::{PartitionId, ShardId};
|
||||
|
||||
use super::*;
|
||||
use crate::buffer_tree::partition::{resolver::mock::MockPartitionProvider, SortKeyState};
|
||||
|
||||
const TRANSITION_SHARD_ID: ShardId = ShardId::new(84);
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_arc_impl() {
|
||||
let key = PartitionKey::from("bananas");
|
||||
|
@ -81,6 +86,7 @@ mod tests {
|
|||
table_id,
|
||||
Arc::clone(&table_name),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
let mock = Arc::new(MockPartitionProvider::default().with_partition(data));
|
||||
|
@ -92,6 +98,7 @@ mod tests {
|
|||
Arc::clone(&namespace_name),
|
||||
table_id,
|
||||
Arc::clone(&table_name),
|
||||
TRANSITION_SHARD_ID,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(got.partition_id(), partition);
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{NamespaceId, TableId};
|
||||
use data_types::{NamespaceId, ShardId, TableId};
|
||||
use dml::DmlOperation;
|
||||
use metric::U64Counter;
|
||||
use parking_lot::Mutex;
|
||||
|
@ -97,6 +97,7 @@ pub(crate) struct BufferTree<O> {
|
|||
namespace_count: U64Counter,
|
||||
|
||||
post_write_observer: Arc<O>,
|
||||
transition_shard_id: ShardId,
|
||||
}
|
||||
|
||||
impl<O> BufferTree<O>
|
||||
|
@ -110,6 +111,7 @@ where
|
|||
partition_provider: Arc<dyn PartitionProvider>,
|
||||
post_write_observer: Arc<O>,
|
||||
metrics: Arc<metric::Registry>,
|
||||
transition_shard_id: ShardId,
|
||||
) -> Self {
|
||||
let namespace_count = metrics
|
||||
.register_metric::<U64Counter>(
|
||||
|
@ -126,6 +128,7 @@ where
|
|||
partition_provider,
|
||||
post_write_observer,
|
||||
namespace_count,
|
||||
transition_shard_id,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -176,6 +179,7 @@ where
|
|||
Arc::clone(&self.partition_provider),
|
||||
Arc::clone(&self.post_write_observer),
|
||||
&self.metrics,
|
||||
self.transition_shard_id,
|
||||
))
|
||||
});
|
||||
|
||||
|
@ -239,6 +243,7 @@ mod tests {
|
|||
const TABLE_NAME: &str = "bananas";
|
||||
const NAMESPACE_NAME: &str = "platanos";
|
||||
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
|
||||
const TRANSITION_SHARD_ID: ShardId = ShardId::new(84);
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_init_table() {
|
||||
|
@ -259,6 +264,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
),
|
||||
));
|
||||
|
||||
|
@ -270,6 +276,7 @@ mod tests {
|
|||
partition_provider,
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
&metrics,
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
// Assert the namespace name was stored
|
||||
|
@ -339,6 +346,7 @@ mod tests {
|
|||
partition_provider,
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
Arc::new(metric::Registry::default()),
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
// Write the provided DmlWrites
|
||||
|
@ -383,6 +391,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
)],
|
||||
writes = [make_write_op(
|
||||
&PartitionKey::from("p1"),
|
||||
|
@ -418,6 +427,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
),
|
||||
PartitionData::new(
|
||||
PartitionId::new(1),
|
||||
|
@ -431,6 +441,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
)
|
||||
],
|
||||
writes = [
|
||||
|
@ -478,6 +489,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
),
|
||||
PartitionData::new(
|
||||
PartitionId::new(1),
|
||||
|
@ -491,6 +503,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
)
|
||||
],
|
||||
writes = [
|
||||
|
@ -537,6 +550,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
),
|
||||
PartitionData::new(
|
||||
PartitionId::new(1),
|
||||
|
@ -550,6 +564,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
)
|
||||
],
|
||||
writes = [
|
||||
|
@ -597,6 +612,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
)],
|
||||
writes = [
|
||||
make_write_op(
|
||||
|
@ -646,6 +662,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
))
|
||||
.with_partition(PartitionData::new(
|
||||
PartitionId::new(0),
|
||||
|
@ -659,6 +676,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
)),
|
||||
);
|
||||
|
||||
|
@ -671,6 +689,7 @@ mod tests {
|
|||
partition_provider,
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
Arc::clone(&metrics),
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
// Write data to partition p1, in table "bananas".
|
||||
|
@ -740,6 +759,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
))
|
||||
.with_partition(PartitionData::new(
|
||||
PartitionId::new(1),
|
||||
|
@ -753,6 +773,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
))
|
||||
.with_partition(PartitionData::new(
|
||||
PartitionId::new(2),
|
||||
|
@ -766,6 +787,7 @@ mod tests {
|
|||
TableName::from("another_table")
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
)),
|
||||
);
|
||||
|
||||
|
@ -776,6 +798,7 @@ mod tests {
|
|||
partition_provider,
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
Arc::clone(&Arc::new(metric::Registry::default())),
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
assert_eq!(buf.partitions().count(), 0);
|
||||
|
@ -849,6 +872,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
),
|
||||
));
|
||||
|
||||
|
@ -859,6 +883,7 @@ mod tests {
|
|||
partition_provider,
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
Arc::new(metric::Registry::default()),
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
// Query the empty tree
|
||||
|
@ -932,6 +957,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
))
|
||||
.with_partition(PartitionData::new(
|
||||
PartitionId::new(1),
|
||||
|
@ -945,6 +971,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
)),
|
||||
);
|
||||
|
||||
|
@ -955,6 +982,7 @@ mod tests {
|
|||
partition_provider,
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
Arc::new(metric::Registry::default()),
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
// Write data to partition p1, in table "bananas".
|
||||
|
|
|
@ -5,7 +5,7 @@ pub(crate) mod name_resolver;
|
|||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, TableId};
|
||||
use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId};
|
||||
use datafusion_util::MemoryStream;
|
||||
use mutable_batch::MutableBatch;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
|
@ -116,6 +116,7 @@ pub(crate) struct TableData<O> {
|
|||
partition_data: RwLock<DoubleRef>,
|
||||
|
||||
post_write_observer: Arc<O>,
|
||||
transition_shard_id: ShardId,
|
||||
}
|
||||
|
||||
impl<O> TableData<O> {
|
||||
|
@ -136,6 +137,7 @@ impl<O> TableData<O> {
|
|||
namespace_name: Arc<DeferredLoad<NamespaceName>>,
|
||||
partition_provider: Arc<dyn PartitionProvider>,
|
||||
post_write_observer: Arc<O>,
|
||||
transition_shard_id: ShardId,
|
||||
) -> Self {
|
||||
Self {
|
||||
table_id,
|
||||
|
@ -145,6 +147,7 @@ impl<O> TableData<O> {
|
|||
partition_data: Default::default(),
|
||||
partition_provider,
|
||||
post_write_observer,
|
||||
transition_shard_id,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -218,6 +221,7 @@ where
|
|||
Arc::clone(&self.namespace_name),
|
||||
self.table_id,
|
||||
Arc::clone(&self.table_name),
|
||||
self.transition_shard_id,
|
||||
)
|
||||
.await;
|
||||
// Add the double-referenced partition to the map.
|
||||
|
@ -319,6 +323,7 @@ mod tests {
|
|||
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
|
||||
const PARTITION_KEY: &str = "platanos";
|
||||
const PARTITION_ID: PartitionId = PartitionId::new(0);
|
||||
const TRANSITION_SHARD_ID: ShardId = ShardId::new(84);
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_partition_double_ref() {
|
||||
|
@ -337,6 +342,7 @@ mod tests {
|
|||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
TRANSITION_SHARD_ID,
|
||||
),
|
||||
));
|
||||
|
||||
|
@ -351,6 +357,7 @@ mod tests {
|
|||
})),
|
||||
partition_provider,
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
let batch = lines_to_batches(r#"bananas,bat=man value=24 42"#, 0)
|
||||
|
|
|
@ -27,7 +27,7 @@ use crate::{
|
|||
server::grpc::GrpcDelegate,
|
||||
timestamp_oracle::TimestampOracle,
|
||||
wal::{rotate_task::periodic_rotation, wal_sink::WalSink},
|
||||
TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
|
||||
TRANSITION_SHARD_INDEX,
|
||||
};
|
||||
|
||||
/// Acquire opaque handles to the Ingester RPC service implementations.
|
||||
|
@ -168,7 +168,8 @@ pub async fn new(
|
|||
.create_or_get("iox-shared")
|
||||
.await
|
||||
.expect("get topic");
|
||||
txn.shards()
|
||||
let transition_shard = txn
|
||||
.shards()
|
||||
.create_or_get(&topic, TRANSITION_SHARD_INDEX)
|
||||
.await
|
||||
.expect("create transition shard");
|
||||
|
@ -199,7 +200,7 @@ pub async fn new(
|
|||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.most_recent_n(40_000, &[TRANSITION_SHARD_ID])
|
||||
.most_recent_n(40_000, &[transition_shard.id])
|
||||
.await
|
||||
.map_err(InitError::PreWarmPartitions)?;
|
||||
|
||||
|
@ -242,6 +243,7 @@ pub async fn new(
|
|||
partition_provider,
|
||||
Arc::new(hot_partition_persister),
|
||||
Arc::clone(&metrics),
|
||||
transition_shard.id,
|
||||
));
|
||||
|
||||
// Initialise the WAL
|
||||
|
|
|
@ -44,7 +44,7 @@
|
|||
missing_docs
|
||||
)]
|
||||
|
||||
use data_types::{ShardId, ShardIndex};
|
||||
use data_types::ShardIndex;
|
||||
|
||||
/// A macro to conditionally prepend `pub` to the inner tokens for benchmarking
|
||||
/// purposes, should the `benches` feature be enabled.
|
||||
|
@ -60,9 +60,9 @@ macro_rules! maybe_pub {
|
|||
};
|
||||
}
|
||||
|
||||
/// During the testing of ingester2, the catalog will require a ShardId for
|
||||
/// various operations. This is a const value for these occasions.
|
||||
const TRANSITION_SHARD_ID: ShardId = ShardId::new(1);
|
||||
/// During the testing of ingester2, the catalog will require a ShardIndex for
|
||||
/// various operations. This is a const value for these occasions. Look up the ShardId for this
|
||||
/// ShardIndex when needed.
|
||||
const TRANSITION_SHARD_INDEX: ShardIndex = ShardIndex::new(1);
|
||||
|
||||
/// Ingester initialisation methods & types.
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::sync::Arc;
|
|||
use backoff::Backoff;
|
||||
use data_types::{
|
||||
CompactionLevel, NamespaceId, ParquetFileParams, PartitionId, PartitionKey, SequenceNumber,
|
||||
TableId,
|
||||
ShardId, TableId,
|
||||
};
|
||||
use iox_catalog::interface::get_table_schema_by_id;
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
|
@ -25,7 +25,6 @@ use crate::{
|
|||
},
|
||||
deferred_load::DeferredLoad,
|
||||
persist::compact::{compact_persisting_batch, CompactedStream},
|
||||
TRANSITION_SHARD_ID,
|
||||
};
|
||||
|
||||
use super::handle::Inner;
|
||||
|
@ -79,6 +78,8 @@ pub(super) struct Context {
|
|||
table_id: TableId,
|
||||
partition_id: PartitionId,
|
||||
|
||||
transition_shard_id: ShardId,
|
||||
|
||||
// The partition key for this partition
|
||||
partition_key: PartitionKey,
|
||||
|
||||
|
@ -162,6 +163,7 @@ impl Context {
|
|||
enqueued_at,
|
||||
dequeued_at: Instant::now(),
|
||||
permit,
|
||||
transition_shard_id: guard.transition_shard_id(),
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -233,7 +235,7 @@ impl Context {
|
|||
let iox_metadata = IoxMetadata {
|
||||
object_store_id,
|
||||
creation_timestamp: SystemProvider::new().now(),
|
||||
shard_id: TRANSITION_SHARD_ID,
|
||||
shard_id: self.transition_shard_id,
|
||||
namespace_id: self.namespace_id,
|
||||
namespace_name: Arc::clone(&*self.namespace_name.get().await),
|
||||
table_id: self.table_id,
|
||||
|
|
|
@ -430,7 +430,7 @@ mod tests {
|
|||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use data_types::{NamespaceId, PartitionId, PartitionKey, TableId};
|
||||
use data_types::{NamespaceId, PartitionId, PartitionKey, ShardId, TableId};
|
||||
use dml::DmlOperation;
|
||||
use iox_catalog::mem::MemCatalog;
|
||||
use lazy_static::lazy_static;
|
||||
|
@ -460,6 +460,7 @@ mod tests {
|
|||
const TABLE_ID: TableId = TableId::new(2442);
|
||||
const TABLE_NAME: &str = "banana-report";
|
||||
const NAMESPACE_NAME: &str = "platanos";
|
||||
const TRANSITION_SHARD_ID: ShardId = ShardId::new(84);
|
||||
|
||||
lazy_static! {
|
||||
static ref EXEC: Arc<Executor> = Arc::new(Executor::new_testing());
|
||||
|
@ -492,10 +493,12 @@ mod tests {
|
|||
TABLE_ID,
|
||||
Arc::clone(&TABLE_NAME_LOADER),
|
||||
sort_key,
|
||||
TRANSITION_SHARD_ID,
|
||||
)),
|
||||
),
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
Default::default(),
|
||||
TRANSITION_SHARD_ID,
|
||||
);
|
||||
|
||||
buffer_tree
|
||||
|
|
|
@ -78,8 +78,6 @@ pub(crate) async fn populate_catalog(
|
|||
.unwrap()
|
||||
.id;
|
||||
|
||||
assert_eq!(shard_id, crate::TRANSITION_SHARD_ID);
|
||||
|
||||
(shard_id, ns_id, table_id)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue