feat: Only send the hash ID from ingester to querier if in catalog

The catalog ID shouldn't be used anywhere, as the two fields get turned
into a TransitionPartitionId on the querier side.

This will enable us to not query the catalog if we're sure this
partition has a hash ID in the catalog.
pull/24376/head
Carol (Nichols || Goulding) 2023-07-21 14:07:27 -04:00
parent 308d7f3d4b
commit e07a48e350
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
12 changed files with 82 additions and 133 deletions

View File

@ -449,7 +449,7 @@ mod tests {
use super::*;
use crate::{
buffer_tree::partition::resolver::SortKeyResolver,
test_util::{populate_catalog, PartitionDataBuilder, ARBITRARY_CATALOG_PARTITION_ID},
test_util::{populate_catalog, PartitionDataBuilder, ARBITRARY_TRANSITION_PARTITION_ID},
};
// Write some data and read it back from the buffer.
@ -473,7 +473,7 @@ mod tests {
let data = p
.get_query_data(&OwnedProjection::default())
.expect("should return data");
assert_eq!(data.partition_id(), ARBITRARY_CATALOG_PARTITION_ID);
assert_eq!(data.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID);
let expected = [
"+--------+--------+----------+--------------------------------+",
@ -496,7 +496,7 @@ mod tests {
let data = p
.get_query_data(&OwnedProjection::default())
.expect("should contain data");
assert_eq!(data.partition_id(), ARBITRARY_CATALOG_PARTITION_ID);
assert_eq!(data.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID);
let expected = [
"+--------+--------+----------+--------------------------------+",
@ -532,7 +532,7 @@ mod tests {
// And validate the data being persisted.
assert_eq!(
persisting_data.partition_id(),
ARBITRARY_CATALOG_PARTITION_ID
&*ARBITRARY_TRANSITION_PARTITION_ID
);
assert_eq!(persisting_data.record_batches().len(), 1);
let expected = [
@ -561,7 +561,7 @@ mod tests {
let data = p
.get_query_data(&OwnedProjection::default())
.expect("must have data");
assert_eq!(data.partition_id(), ARBITRARY_CATALOG_PARTITION_ID);
assert_eq!(data.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID);
assert_eq!(data.record_batches().len(), 2);
let expected = [
"+--------+--------+----------+--------------------------------+",
@ -589,7 +589,7 @@ mod tests {
let data = p
.get_query_data(&OwnedProjection::default())
.expect("must have data");
assert_eq!(data.partition_id(), ARBITRARY_CATALOG_PARTITION_ID);
assert_eq!(data.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID);
assert_eq!(data.record_batches().len(), 1);
let expected = [
"+--------+--------+---------+--------------------------------+",

View File

@ -238,7 +238,7 @@ mod tests {
use assert_matches::assert_matches;
use data_types::{
partition_template::{test_table_partition_override, TemplatePart},
PartitionHashId, PartitionId, PartitionKey,
PartitionId, PartitionKey, TransitionPartitionId,
};
use datafusion::{
assert_batches_eq, assert_batches_sorted_eq,
@ -265,7 +265,7 @@ mod tests {
defer_namespace_name_1_ms, make_write_op, PartitionDataBuilder,
ARBITRARY_CATALOG_PARTITION_ID, ARBITRARY_NAMESPACE_ID, ARBITRARY_NAMESPACE_NAME,
ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_PROVIDER,
ARBITRARY_TABLE_PROVIDER, ARBITRARY_TRANSITION_PARTITION_ID,
},
};
@ -1307,10 +1307,7 @@ mod tests {
let partition = partitions.pop().unwrap();
// Ensure the partition hash ID is sent.
assert_eq!(
partition.partition_hash_id().unwrap(),
&PartitionHashId::new(ARBITRARY_TABLE_ID, &ARBITRARY_PARTITION_KEY)
);
assert_eq!(partition.id(), &*ARBITRARY_TRANSITION_PARTITION_ID);
// Perform the partition read
let batches = partition.into_record_batches();
@ -1383,6 +1380,9 @@ mod tests {
let partition = partitions.pop().unwrap();
// Ensure the partition hash ID is NOT sent.
assert!(partition.partition_hash_id().is_none());
assert_eq!(
partition.id(),
&TransitionPartitionId::Deprecated(ARBITRARY_CATALOG_PARTITION_ID),
);
}
}

View File

@ -273,11 +273,10 @@ where
let partitions = self.partitions().into_iter().filter_map(move |p| {
let mut span = span.child("partition read");
let (id, hash_id, completed_persistence_count, data, partition_key) = {
let (id, completed_persistence_count, data, partition_key) = {
let mut p = p.lock();
(
p.partition_id(),
p.partition_hash_id().cloned(),
p.transition_partition_id(),
p.completed_persistence_count(),
p.get_query_data(&projection),
p.partition_key().clone(),
@ -286,7 +285,7 @@ where
let ret = match data {
Some(data) => {
assert_eq!(id, data.partition_id());
assert_eq!(&id, data.partition_id());
// Potentially prune out this partition if the partition
// template & derived partition key can be used to match
@ -324,11 +323,10 @@ where
PartitionResponse::new(
data.into_record_batches(),
id,
hash_id,
completed_persistence_count,
)
}
None => PartitionResponse::new(vec![], id, hash_id, completed_persistence_count),
None => PartitionResponse::new(vec![], id, completed_persistence_count),
};
span.ok("read partition data");

View File

@ -1,9 +1,6 @@
use std::sync::Arc;
use data_types::{
NamespaceId, ParquetFileParams, PartitionHashId, PartitionId, PartitionKey, TableId,
TransitionPartitionId,
};
use data_types::{NamespaceId, ParquetFileParams, PartitionKey, TableId, TransitionPartitionId};
use observability_deps::tracing::*;
use parking_lot::Mutex;
use schema::sort::SortKey;
@ -68,8 +65,8 @@ impl PersistRequest {
)
}
/// Return the partition ID of the persisting data.
pub(super) fn partition_id(&self) -> PartitionId {
/// Return the partition identifier of the persisting data.
pub(super) fn partition_id(&self) -> &TransitionPartitionId {
self.data.partition_id()
}
}
@ -88,8 +85,7 @@ pub(super) struct Context {
/// IDs loaded from the partition at construction time.
namespace_id: NamespaceId,
table_id: TableId,
partition_id: PartitionId,
partition_hash_id: Option<PartitionHashId>,
partition_id: TransitionPartitionId,
// The partition key for this partition
partition_key: PartitionKey,
@ -137,7 +133,7 @@ impl Context {
/// Locks the [`PartitionData`] in `req` to read various properties which
/// are then cached in the [`Context`].
pub(super) fn new(req: PersistRequest) -> Self {
let partition_id = req.data.partition_id();
let partition_id = req.data.partition_id().clone();
// Obtain the partition lock and load the immutable values that will be
// used during this persistence.
@ -153,7 +149,7 @@ impl Context {
let p = Arc::clone(&partition);
let guard = p.lock();
assert_eq!(partition_id, guard.partition_id());
assert_eq!(partition_id, guard.transition_partition_id());
Self {
partition,
@ -161,7 +157,6 @@ impl Context {
namespace_id: guard.namespace_id(),
table_id: guard.table_id(),
partition_id,
partition_hash_id: guard.partition_hash_id().cloned(),
partition_key: guard.partition_key().clone(),
namespace_name: Arc::clone(guard.namespace_name()),
table: Arc::clone(guard.table()),
@ -292,16 +287,8 @@ impl Context {
self.table_id
}
pub(super) fn partition_id(&self) -> PartitionId {
self.partition_id
}
pub(super) fn partition_hash_id(&self) -> Option<PartitionHashId> {
self.partition_hash_id.clone()
}
pub(super) fn transition_partition_id(&self) -> TransitionPartitionId {
TransitionPartitionId::from((self.partition_id, self.partition_hash_id.as_ref()))
pub(super) fn partition_id(&self) -> &TransitionPartitionId {
&self.partition_id
}
pub(super) fn partition_key(&self) -> &PartitionKey {

View File

@ -32,7 +32,7 @@ where
let data = p.lock().mark_persisting()?;
debug!(
partition_id=data.partition_id().get(),
partition_id=%data.partition_id(),
lock_wait=?Instant::now().duration_since(t),
"read data for persistence"
);

View File

@ -312,7 +312,7 @@ impl PersistHandle {
fn assign_worker(&self, r: PersistRequest) {
debug!(
partition_id = r.partition_id().get(),
partition_id = %r.partition_id(),
"enqueue persist job to assigned worker"
);
@ -357,8 +357,8 @@ impl PersistQueue for PersistHandle {
partition: Arc<Mutex<PartitionData>>,
data: PersistingData,
) -> oneshot::Receiver<()> {
let partition_id = data.partition_id().get();
debug!(partition_id, "enqueuing persistence task");
let partition_id = data.partition_id().clone();
debug!(%partition_id, "enqueuing persistence task");
// Record a starting timestamp, and increment the number of persist jobs
// before waiting on the semaphore - this ensures the difference between
@ -435,7 +435,7 @@ impl PersistQueue for PersistHandle {
if let Some(new_sort_key) = adjust_sort_key_columns(&v, &data_primary_key).1 {
// This persist operation will require a sort key update.
trace!(
partition_id,
%partition_id,
old_sort_key = %v,
%new_sort_key,
"persist job will require sort key update"
@ -444,7 +444,7 @@ impl PersistQueue for PersistHandle {
} else {
// This persist operation will not require a sort key
// update.
debug!(partition_id, "enqueue persist job to global work queue");
debug!(%partition_id, "enqueue persist job to global work queue");
self.global_queue.send(r).await.expect("no persist workers");
}
}
@ -452,7 +452,7 @@ impl PersistQueue for PersistHandle {
// If no sort key is known (either because it was unresolved, or
// not yet set), the task must be serialised w.r.t other persist
// jobs for the same partition.
trace!(partition_id, "persist job has no known sort key");
trace!(%partition_id, "persist job has no known sort key");
self.assign_worker(r);
}
}
@ -499,9 +499,9 @@ mod tests {
tests::{assert_metric_counter, assert_metric_gauge},
},
test_util::{
make_write_op, PartitionDataBuilder, ARBITRARY_CATALOG_PARTITION_ID,
ARBITRARY_NAMESPACE_ID, ARBITRARY_NAMESPACE_NAME, ARBITRARY_PARTITION_KEY,
ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_PROVIDER,
make_write_op, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, ARBITRARY_NAMESPACE_NAME,
ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_PROVIDER, ARBITRARY_TRANSITION_PARTITION_ID,
},
};
@ -591,7 +591,7 @@ mod tests {
.expect("message was not found in either worker")
}
};
assert_eq!(msg.partition_id(), ARBITRARY_CATALOG_PARTITION_ID);
assert_eq!(msg.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID);
// Drop the message, and ensure the notification becomes inactive.
drop(msg);
@ -611,7 +611,7 @@ mod tests {
let msg = assigned_worker
.try_recv()
.expect("message was not found in either worker");
assert_eq!(msg.partition_id(), ARBITRARY_CATALOG_PARTITION_ID);
assert_eq!(msg.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID);
}
/// A test that ensures the correct destination of a partition that has no
@ -677,7 +677,7 @@ mod tests {
.expect("message was not found in either worker")
}
};
assert_eq!(msg.partition_id(), ARBITRARY_CATALOG_PARTITION_ID);
assert_eq!(msg.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID);
// Drop the message, and ensure the notification becomes inactive.
drop(msg);
@ -698,7 +698,7 @@ mod tests {
let msg = assigned_worker
.try_recv()
.expect("message was not found in either worker");
assert_eq!(msg.partition_id(), ARBITRARY_CATALOG_PARTITION_ID);
assert_eq!(msg.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID);
}
/// A test that ensures the correct destination of a partition that has an
@ -765,7 +765,7 @@ mod tests {
.expect("message was not found in either worker")
}
};
assert_eq!(msg.partition_id(), ARBITRARY_CATALOG_PARTITION_ID);
assert_eq!(msg.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID);
// Drop the message, and ensure the notification becomes inactive.
drop(msg);
@ -786,7 +786,7 @@ mod tests {
let msg = assigned_worker
.try_recv()
.expect("message was not found in either worker");
assert_eq!(msg.partition_id(), ARBITRARY_CATALOG_PARTITION_ID);
assert_eq!(msg.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID);
}
/// A test that a partition that does not require a sort key update is
@ -845,7 +845,7 @@ mod tests {
let msg = global_rx
.try_recv()
.expect("task should be in global queue");
assert_eq!(msg.partition_id(), ARBITRARY_CATALOG_PARTITION_ID);
assert_eq!(msg.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID);
// Drop the message, and ensure the notification becomes inactive.
drop(msg);
@ -866,7 +866,7 @@ mod tests {
let msg = global_rx
.try_recv()
.expect("task should be in global queue");
assert_eq!(msg.partition_id(), ARBITRARY_CATALOG_PARTITION_ID);
assert_eq!(msg.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID);
}
/// A test that a ensures tasks waiting to be enqueued (waiting on the

View File

@ -278,12 +278,7 @@ where
let pool = worker_state.exec.pool();
let (md, file_size) = worker_state
.store
.upload(
record_stream,
&ctx.transition_partition_id(),
&iox_metadata,
pool,
)
.upload(record_stream, &ctx.partition_id(), &iox_metadata, pool)
.await
.expect("unexpected fatal persist error");
@ -376,11 +371,7 @@ where
let mut repos = catalog.repositories().await;
match repos
.partitions()
.cas_sort_key(
&ctx.transition_partition_id(),
old_sort_key.clone(),
&new_sort_key_str,
)
.cas_sort_key(&ctx.partition_id(), old_sort_key.clone(), &new_sort_key_str)
.await
{
Ok(_) => ControlFlow::Break(Ok(())),

View File

@ -3,7 +3,7 @@
//! [`QueryResponse`]: super::response::QueryResponse
use arrow::record_batch::RecordBatch;
use data_types::{PartitionHashId, PartitionId};
use data_types::TransitionPartitionId;
/// Response data for a single partition.
#[derive(Debug)]
@ -12,10 +12,7 @@ pub(crate) struct PartitionResponse {
batches: Vec<RecordBatch>,
/// Partition ID.
id: PartitionId,
/// Partition hash ID, if stored in the database.
partition_hash_id: Option<PartitionHashId>,
id: TransitionPartitionId,
/// Count of persisted Parquet files for this partition by this ingester instance.
completed_persistence_count: u64,
@ -24,24 +21,18 @@ pub(crate) struct PartitionResponse {
impl PartitionResponse {
pub(crate) fn new(
data: Vec<RecordBatch>,
id: PartitionId,
partition_hash_id: Option<PartitionHashId>,
id: TransitionPartitionId,
completed_persistence_count: u64,
) -> Self {
Self {
batches: data,
id,
partition_hash_id,
completed_persistence_count,
}
}
pub(crate) fn id(&self) -> PartitionId {
self.id
}
pub(crate) fn partition_hash_id(&self) -> Option<&PartitionHashId> {
self.partition_hash_id.as_ref()
pub(crate) fn id(&self) -> &TransitionPartitionId {
&self.id
}
pub(crate) fn completed_persistence_count(&self) -> u64 {

View File

@ -332,8 +332,7 @@ where
*this.partition_count += 1;
// Extract all the fields of the PartitionResponse
let id = p.id();
let hash_id = p.partition_hash_id().cloned();
let id = p.id().clone();
let persist_count = p.completed_persistence_count();
// And wrap the underlying stream of RecordBatch for this
@ -346,12 +345,7 @@ where
this.record_batch_count
.fetch_add(data.len(), Ordering::Relaxed);
Poll::Ready(Some(PartitionResponse::new(
data,
id,
hash_id,
persist_count,
)))
Poll::Ready(Some(PartitionResponse::new(data, id, persist_count)))
}
Poll::Ready(None) => {
// Record the wall clock timestamp of the stream end.
@ -435,12 +429,10 @@ mod tests {
make_batch, make_partition_stream,
query::mock_query_exec::MockQueryExec,
test_util::{
ARBITRARY_CATALOG_PARTITION_ID, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_HASH_ID,
ARBITRARY_TABLE_ID,
ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, ARBITRARY_TRANSITION_PARTITION_ID,
},
};
use arrow::array::{Float32Array, Int64Array};
use data_types::PartitionHashId;
use futures::{stream, StreamExt};
use iox_time::MockProvider;
use metric::{assert_histogram, Attributes};
@ -457,8 +449,7 @@ mod tests {
// Construct a stream with no batches.
let stream = PartitionStream::new(stream::iter([PartitionResponse::new(
vec![],
ARBITRARY_CATALOG_PARTITION_ID,
Some(ARBITRARY_PARTITION_HASH_ID.clone()),
ARBITRARY_TRANSITION_PARTITION_ID.clone(),
42,
)]));

View File

@ -88,10 +88,10 @@ impl QueryAdaptor {
self.data
}
/// Returns the partition ID from which the data this [`QueryAdaptor`] was
/// Returns the partition identifier from which the data this [`QueryAdaptor`] was
/// sourced from.
pub(crate) fn partition_id(&self) -> PartitionId {
self.partition_id
pub(crate) fn partition_id(&self) -> &TransitionPartitionId {
&self.transition_partition_id
}
/// Number of rows, useful for building stats

View File

@ -6,7 +6,7 @@ use arrow_flight::{
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult,
SchemaResult, Ticket,
};
use data_types::{NamespaceId, PartitionHashId, PartitionId, TableId};
use data_types::{NamespaceId, TableId, TransitionPartitionId};
use flatbuffers::FlatBufferBuilder;
use futures::{Stream, StreamExt, TryStreamExt};
use ingester_query_grpc::influxdata::iox::ingester::v1 as proto;
@ -303,10 +303,8 @@ where
/// Encode the partition information as a None flight data with meatadata
fn encode_partition(
// Partition ID.
partition_id: PartitionId,
// Partition hash ID.
partition_hash_id: Option<PartitionHashId>,
// Partition identifier.
partition_id: TransitionPartitionId,
// Count of persisted Parquet files for the [`PartitionData`] instance this
// [`PartitionResponse`] was generated from.
//
@ -316,9 +314,17 @@ fn encode_partition(
ingester_id: IngesterId,
) -> Result<FlightData, FlightError> {
let mut bytes = bytes::BytesMut::new();
let (partition_id, partition_hash_id) = match partition_id {
TransitionPartitionId::Deterministic(hash_id) => {
(None, Some(hash_id.as_bytes().to_owned()))
}
TransitionPartitionId::Deprecated(partition_id) => (Some(partition_id.get()), None),
};
let app_metadata = proto::IngesterQueryResponseMetadata {
partition_id: Some(partition_id.get()),
partition_hash_id: partition_hash_id.map(|hash_id| hash_id.as_bytes().to_owned()),
partition_id,
partition_hash_id,
ingester_uuid: ingester_id.to_string(),
completed_persistence_count,
};
@ -352,18 +358,12 @@ fn encode_response(
frame_encoding_duration_metric: Arc<DurationHistogram>,
) -> impl Stream<Item = Result<FlightData, FlightError>> {
response.into_partition_stream().flat_map(move |partition| {
let partition_id = partition.id();
let partition_hash_id = partition.partition_hash_id().cloned();
let partition_id = partition.id().clone();
let completed_persistence_count = partition.completed_persistence_count();
// prefix payload data w/ metadata for that particular partition
let head = futures::stream::once(async move {
encode_partition(
partition_id,
partition_hash_id,
completed_persistence_count,
ingester_id,
)
encode_partition(partition_id, completed_persistence_count, ingester_id)
});
// An output vector of FlightDataEncoder streams, each entry stream with
@ -402,25 +402,24 @@ mod tests {
mock_query_exec::MockQueryExec, partition_response::PartitionResponse,
response::PartitionStream,
},
test_util::ARBITRARY_PARTITION_HASH_ID,
test_util::{ARBITRARY_PARTITION_HASH_ID, ARBITRARY_TRANSITION_PARTITION_ID},
};
use arrow::array::{Float64Array, Int32Array};
use arrow_flight::decode::{DecodedPayload, FlightRecordBatchStream};
use assert_matches::assert_matches;
use bytes::Bytes;
use data_types::PartitionId;
use tonic::Code;
#[tokio::test]
async fn sends_partition_hash_id_if_present() {
async fn sends_only_partition_hash_id_if_present() {
let ingester_id = IngesterId::new();
// let partition_hash_id = PartitionHashId::new(TableId::new(3), &ARBITRARY_PARTITION_KEY);
let flight = FlightService::new(
MockQueryExec::default().with_result(Ok(QueryResponse::new(PartitionStream::new(
futures::stream::iter([PartitionResponse::new(
vec![],
PartitionId::new(2),
Some(ARBITRARY_PARTITION_HASH_ID.clone()),
ARBITRARY_TRANSITION_PARTITION_ID.clone(),
42,
)]),
)))),
@ -447,7 +446,7 @@ mod tests {
let md_actual =
proto::IngesterQueryResponseMetadata::decode(flight_data[0].app_metadata()).unwrap();
let md_expected = proto::IngesterQueryResponseMetadata {
partition_id: Some(2),
partition_id: None,
partition_hash_id: Some(ARBITRARY_PARTITION_HASH_ID.as_bytes().to_vec()),
ingester_uuid: ingester_id.to_string(),
completed_persistence_count: 42,
@ -462,8 +461,7 @@ mod tests {
MockQueryExec::default().with_result(Ok(QueryResponse::new(PartitionStream::new(
futures::stream::iter([PartitionResponse::new(
vec![],
PartitionId::new(2),
None,
TransitionPartitionId::Deprecated(PartitionId::new(2)),
42,
)]),
)))),
@ -562,8 +560,7 @@ mod tests {
batch3.clone(),
batch4.clone(),
],
PartitionId::new(2),
partition_hash_id.clone(),
ARBITRARY_TRANSITION_PARTITION_ID.clone(),
42,
)]),
)))),
@ -591,7 +588,7 @@ mod tests {
let md_actual =
proto::IngesterQueryResponseMetadata::decode(flight_data[0].app_metadata()).unwrap();
let md_expected = proto::IngesterQueryResponseMetadata {
partition_id: Some(2),
partition_id: None,
partition_hash_id: partition_hash_id.map(|hash_id| hash_id.as_bytes().to_vec()),
ingester_uuid: ingester_id.to_string(),
completed_persistence_count: 42,

View File

@ -256,6 +256,7 @@ macro_rules! make_partition_stream {
query::{response::PartitionStream, partition_response::PartitionResponse},
test_util::ARBITRARY_PARTITION_KEY,
};
use data_types::{PartitionHashId, TableId, TransitionPartitionId};
use futures::stream;
PartitionStream::new(stream::iter([
@ -274,14 +275,7 @@ macro_rules! make_partition_stream {
PartitionResponse::new(
batches,
// Using the $id as both the PartitionId and the TableId in the
// PartitionHashId is a temporary way to reduce duplication in tests where
// the important part is which batches are in the same partition and which
// batches are in a different partition, not what the actual identifier
// values are. This will go away when the ingester no longer sends
// PartitionIds.
data_types::PartitionId::new($id),
Some(
TransitionPartitionId::Deterministic(
PartitionHashId::new(
TableId::new($id),
&*ARBITRARY_PARTITION_KEY