Merge pull request #6428 from influxdata/cn/fix

fix: Correct *some* problems mkm found with ingester2/querier
pull/24376/head
kodiakhq[bot] 2022-12-16 22:52:16 +00:00 committed by GitHub
commit 2d8b245598
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 191 additions and 116 deletions

View File

@ -225,43 +225,6 @@ jobs:
echo "No changes to commit" echo "No changes to commit"
fi fi
test_rpc_write:
# setup multiple docker images (see https://circleci.com/docs/2.0/configuration-reference/#docker)
docker:
- image: quay.io/influxdb/rust:ci
- image: vectorized/redpanda:v22.1.5
command: redpanda start --overprovisioned --smp 1 --memory 1G --reserve-memory 0M
- image: postgres
environment:
POSTGRES_HOST_AUTH_METHOD: trust
resource_class: 2xlarge+ # use of a smaller executor tends crashes on link
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
CARGO_INCREMENTAL: "0"
# Disable full debug symbol generation to speed up CI build
# "1" means line tables only, which is useful for panic tracebacks.
RUSTFLAGS: "-C debuginfo=1"
# https://github.com/rust-lang/cargo/issues/10280
CARGO_NET_GIT_FETCH_WITH_CLI: "true"
RUST_BACKTRACE: "1"
# Run integration tests
TEST_INTEGRATION: 1
INFLUXDB_IOX_INTEGRATION_LOCAL: 1
KAFKA_CONNECT: "localhost:9092"
POSTGRES_USER: postgres
TEST_INFLUXDB_IOX_CATALOG_DSN: "postgres://postgres@localhost/iox_shared"
# When removing this, also remove the ignore on the test in trogging/src/cli.rs
RUST_LOG: debug,,hyper::proto::h1=info,h2=info
LOG_FILTER: debug,,hyper::proto::h1=info,h2=info
steps:
- checkout
- rust_components
- cache_restore
- run:
name: Cargo test RPC write path
command: cargo test --workspace
- cache_save
test: test:
# setup multiple docker images (see https://circleci.com/docs/2.0/configuration-reference/#docker) # setup multiple docker images (see https://circleci.com/docs/2.0/configuration-reference/#docker)
docker: docker:
@ -590,7 +553,6 @@ workflows:
- protobuf-lint - protobuf-lint
- docs-lint - docs-lint
- test - test
- test_rpc_write
- test_heappy - test_heappy
- build_dev - build_dev
- doc - doc
@ -610,7 +572,6 @@ workflows:
- protobuf-lint - protobuf-lint
- docs-lint - docs-lint
- test - test
- test_rpc_write
- test_heappy - test_heappy
- build_dev - build_dev
- build_release - build_release

View File

@ -5,7 +5,7 @@ pub(crate) mod name_resolver;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use data_types::{NamespaceId, TableId}; use data_types::{NamespaceId, ShardId, TableId};
use dml::DmlOperation; use dml::DmlOperation;
use metric::U64Counter; use metric::U64Counter;
use observability_deps::tracing::warn; use observability_deps::tracing::warn;
@ -77,6 +77,8 @@ pub(crate) struct NamespaceData<O> {
partition_provider: Arc<dyn PartitionProvider>, partition_provider: Arc<dyn PartitionProvider>,
post_write_observer: Arc<O>, post_write_observer: Arc<O>,
transition_shard_id: ShardId,
} }
impl<O> NamespaceData<O> { impl<O> NamespaceData<O> {
@ -88,6 +90,7 @@ impl<O> NamespaceData<O> {
partition_provider: Arc<dyn PartitionProvider>, partition_provider: Arc<dyn PartitionProvider>,
post_write_observer: Arc<O>, post_write_observer: Arc<O>,
metrics: &metric::Registry, metrics: &metric::Registry,
transition_shard_id: ShardId,
) -> Self { ) -> Self {
let table_count = metrics let table_count = metrics
.register_metric::<U64Counter>( .register_metric::<U64Counter>(
@ -104,6 +107,7 @@ impl<O> NamespaceData<O> {
table_count, table_count,
partition_provider, partition_provider,
post_write_observer, post_write_observer,
transition_shard_id,
} }
} }
@ -163,6 +167,7 @@ where
Arc::clone(&self.namespace_name), Arc::clone(&self.namespace_name),
Arc::clone(&self.partition_provider), Arc::clone(&self.partition_provider),
Arc::clone(&self.post_write_observer), Arc::clone(&self.post_write_observer),
self.transition_shard_id,
)) ))
}); });
@ -226,7 +231,7 @@ where
mod tests { mod tests {
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use data_types::{PartitionId, PartitionKey, ShardIndex}; use data_types::{PartitionId, PartitionKey, ShardId, ShardIndex};
use metric::{Attributes, Metric}; use metric::{Attributes, Metric};
use super::*; use super::*;
@ -246,6 +251,7 @@ mod tests {
const TABLE_ID: TableId = TableId::new(44); const TABLE_ID: TableId = TableId::new(44);
const NAMESPACE_NAME: &str = "platanos"; const NAMESPACE_NAME: &str = "platanos";
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
const TRANSITION_SHARD_ID: ShardId = ShardId::new(84);
#[tokio::test] #[tokio::test]
async fn test_namespace_init_table() { async fn test_namespace_init_table() {
@ -266,6 +272,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
), ),
)); ));
@ -276,6 +283,7 @@ mod tests {
partition_provider, partition_provider,
Arc::new(MockPostWriteObserver::default()), Arc::new(MockPostWriteObserver::default()),
&metrics, &metrics,
TRANSITION_SHARD_ID,
); );
// Assert the namespace name was stored // Assert the namespace name was stored

View File

@ -2,7 +2,7 @@
use std::{collections::VecDeque, sync::Arc}; use std::{collections::VecDeque, sync::Arc};
use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, TableId}; use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId};
use mutable_batch::MutableBatch; use mutable_batch::MutableBatch;
use observability_deps::tracing::*; use observability_deps::tracing::*;
use schema::sort::SortKey; use schema::sort::SortKey;
@ -89,6 +89,8 @@ pub(crate) struct PartitionData {
/// The number of persist operations completed over the lifetime of this /// The number of persist operations completed over the lifetime of this
/// [`PartitionData`]. /// [`PartitionData`].
completed_persistence_count: u64, completed_persistence_count: u64,
transition_shard_id: ShardId,
} }
impl PartitionData { impl PartitionData {
@ -102,6 +104,7 @@ impl PartitionData {
table_id: TableId, table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>, table_name: Arc<DeferredLoad<TableName>>,
sort_key: SortKeyState, sort_key: SortKeyState,
transition_shard_id: ShardId,
) -> Self { ) -> Self {
Self { Self {
partition_id: id, partition_id: id,
@ -115,6 +118,7 @@ impl PartitionData {
persisting: VecDeque::with_capacity(1), persisting: VecDeque::with_capacity(1),
started_persistence_count: BatchIdent::default(), started_persistence_count: BatchIdent::default(),
completed_persistence_count: 0, completed_persistence_count: 0,
transition_shard_id,
} }
} }
@ -293,6 +297,11 @@ impl PartitionData {
&self.partition_key &self.partition_key
} }
/// Return the transition_shard_id for this partition.
pub(crate) fn transition_shard_id(&self) -> ShardId {
self.transition_shard_id
}
/// Return the [`NamespaceId`] this partition is a part of. /// Return the [`NamespaceId`] this partition is a part of.
pub(crate) fn namespace_id(&self) -> NamespaceId { pub(crate) fn namespace_id(&self) -> NamespaceId {
self.namespace_id self.namespace_id
@ -344,6 +353,7 @@ mod tests {
use crate::{buffer_tree::partition::resolver::SortKeyResolver, test_util::populate_catalog}; use crate::{buffer_tree::partition::resolver::SortKeyResolver, test_util::populate_catalog};
const PARTITION_ID: PartitionId = PartitionId::new(1); const PARTITION_ID: PartitionId = PartitionId::new(1);
const TRANSITION_SHARD_ID: ShardId = ShardId::new(84);
lazy_static! { lazy_static! {
static ref PARTITION_KEY: PartitionKey = PartitionKey::from("platanos"); static ref PARTITION_KEY: PartitionKey = PartitionKey::from("platanos");
@ -369,6 +379,7 @@ mod tests {
TABLE_NAME.clone() TABLE_NAME.clone()
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
); );
// And no data should be returned when queried. // And no data should be returned when queried.
@ -449,6 +460,7 @@ mod tests {
TABLE_NAME.clone() TABLE_NAME.clone()
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
); );
assert!(p.get_query_data().is_none()); assert!(p.get_query_data().is_none());
@ -599,6 +611,7 @@ mod tests {
TABLE_NAME.clone() TABLE_NAME.clone()
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
); );
// Perform the initial write. // Perform the initial write.
@ -777,6 +790,7 @@ mod tests {
TABLE_NAME.clone() TABLE_NAME.clone()
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
); );
// Perform the initial write. // Perform the initial write.
@ -957,6 +971,7 @@ mod tests {
TableName::from("platanos") TableName::from("platanos")
})), })),
starting_state, starting_state,
TRANSITION_SHARD_ID,
); );
let want = Some(SortKey::from_columns(["banana", "platanos", "time"])); let want = Some(SortKey::from_columns(["banana", "platanos", "time"]));
@ -1016,6 +1031,7 @@ mod tests {
TableName::from("platanos") TableName::from("platanos")
})), })),
starting_state, starting_state,
shard_id,
); );
let want = Some(SortKey::from_columns(["banana", "platanos", "time"])); let want = Some(SortKey::from_columns(["banana", "platanos", "time"]));
@ -1040,6 +1056,7 @@ mod tests {
TABLE_NAME.clone() TABLE_NAME.clone()
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
); );
// Perform out of order writes. // Perform out of order writes.
@ -1087,6 +1104,7 @@ mod tests {
TABLE_NAME.clone() TABLE_NAME.clone()
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
); );
assert!(p.mark_persisting().is_none()); assert!(p.mark_persisting().is_none());
@ -1106,6 +1124,7 @@ mod tests {
TABLE_NAME.clone() TABLE_NAME.clone()
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
); );
let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1; let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1;
@ -1132,6 +1151,7 @@ mod tests {
TABLE_NAME.clone() TABLE_NAME.clone()
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
); );
assert!(p.get_query_data().is_none()); assert!(p.get_query_data().is_none());

View File

@ -2,7 +2,9 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use async_trait::async_trait; use async_trait::async_trait;
use backoff::BackoffConfig; use backoff::BackoffConfig;
use data_types::{NamespaceId, Partition, PartitionId, PartitionKey, SequenceNumber, TableId}; use data_types::{
NamespaceId, Partition, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId,
};
use iox_catalog::interface::Catalog; use iox_catalog::interface::Catalog;
use observability_deps::tracing::debug; use observability_deps::tracing::debug;
use parking_lot::Mutex; use parking_lot::Mutex;
@ -164,6 +166,7 @@ where
namespace_name: Arc<DeferredLoad<NamespaceName>>, namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId, table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>, table_name: Arc<DeferredLoad<TableName>>,
transition_shard_id: ShardId,
) -> PartitionData { ) -> PartitionData {
// Use the cached PartitionKey instead of the caller's partition_key, // Use the cached PartitionKey instead of the caller's partition_key,
// instead preferring to reuse the already-shared Arc<str> in the cache. // instead preferring to reuse the already-shared Arc<str> in the cache.
@ -193,6 +196,7 @@ where
table_id, table_id,
table_name, table_name,
SortKeyState::Deferred(Arc::new(sort_key_resolver)), SortKeyState::Deferred(Arc::new(sort_key_resolver)),
transition_shard_id,
); );
} }
@ -206,6 +210,7 @@ where
namespace_name, namespace_name,
table_id, table_id,
table_name, table_name,
transition_shard_id,
) )
.await .await
} }
@ -213,12 +218,11 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use data_types::ShardId;
use iox_catalog::mem::MemCatalog; use iox_catalog::mem::MemCatalog;
use super::*; use super::*;
use crate::{ use crate::buffer_tree::partition::resolver::mock::MockPartitionProvider;
buffer_tree::partition::resolver::mock::MockPartitionProvider, TRANSITION_SHARD_ID,
};
const PARTITION_KEY: &str = "bananas"; const PARTITION_KEY: &str = "bananas";
const PARTITION_ID: PartitionId = PartitionId::new(42); const PARTITION_ID: PartitionId = PartitionId::new(42);
@ -226,6 +230,7 @@ mod tests {
const NAMESPACE_NAME: &str = "ns-bananas"; const NAMESPACE_NAME: &str = "ns-bananas";
const TABLE_ID: TableId = TableId::new(3); const TABLE_ID: TableId = TableId::new(3);
const TABLE_NAME: &str = "platanos"; const TABLE_NAME: &str = "platanos";
const TRANSITION_SHARD_ID: ShardId = ShardId::new(84);
fn new_cache<P>( fn new_cache<P>(
inner: MockPartitionProvider, inner: MockPartitionProvider,
@ -257,6 +262,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
); );
let inner = MockPartitionProvider::default().with_partition(data); let inner = MockPartitionProvider::default().with_partition(data);
@ -272,6 +278,7 @@ mod tests {
Arc::new(DeferredLoad::new(Duration::from_secs(1), async { Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
TRANSITION_SHARD_ID,
) )
.await; .await;
@ -310,6 +317,7 @@ mod tests {
Arc::new(DeferredLoad::new(Duration::from_secs(1), async { Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
TRANSITION_SHARD_ID,
) )
.await; .await;
@ -347,6 +355,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
)); ));
let partition = Partition { let partition = Partition {
@ -370,6 +379,7 @@ mod tests {
Arc::new(DeferredLoad::new(Duration::from_secs(1), async { Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
TRANSITION_SHARD_ID,
) )
.await; .await;
@ -393,6 +403,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
)); ));
let partition = Partition { let partition = Partition {
@ -416,6 +427,7 @@ mod tests {
Arc::new(DeferredLoad::new(Duration::from_secs(1), async { Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
TRANSITION_SHARD_ID,
) )
.await; .await;

View File

@ -5,7 +5,7 @@ use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig}; use backoff::{Backoff, BackoffConfig};
use data_types::{NamespaceId, Partition, PartitionKey, TableId}; use data_types::{NamespaceId, Partition, PartitionKey, ShardId, TableId};
use iox_catalog::interface::Catalog; use iox_catalog::interface::Catalog;
use observability_deps::tracing::debug; use observability_deps::tracing::debug;
@ -17,7 +17,6 @@ use crate::{
table::TableName, table::TableName,
}, },
deferred_load::DeferredLoad, deferred_load::DeferredLoad,
TRANSITION_SHARD_ID,
}; };
/// A [`PartitionProvider`] implementation that hits the [`Catalog`] to resolve /// A [`PartitionProvider`] implementation that hits the [`Catalog`] to resolve
@ -43,12 +42,13 @@ impl CatalogPartitionResolver {
&self, &self,
partition_key: PartitionKey, partition_key: PartitionKey,
table_id: TableId, table_id: TableId,
transition_shard_id: ShardId,
) -> Result<Partition, iox_catalog::interface::Error> { ) -> Result<Partition, iox_catalog::interface::Error> {
self.catalog self.catalog
.repositories() .repositories()
.await .await
.partitions() .partitions()
.create_or_get(partition_key, TRANSITION_SHARD_ID, table_id) .create_or_get(partition_key, transition_shard_id, table_id)
.await .await
} }
} }
@ -62,16 +62,18 @@ impl PartitionProvider for CatalogPartitionResolver {
namespace_name: Arc<DeferredLoad<NamespaceName>>, namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId, table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>, table_name: Arc<DeferredLoad<TableName>>,
transition_shard_id: ShardId,
) -> PartitionData { ) -> PartitionData {
debug!( debug!(
%partition_key, %partition_key,
%table_id, %table_id,
%table_name, %table_name,
%transition_shard_id,
"upserting partition in catalog" "upserting partition in catalog"
); );
let p = Backoff::new(&self.backoff_config) let p = Backoff::new(&self.backoff_config)
.retry_all_errors("resolve partition", || { .retry_all_errors("resolve partition", || {
self.get(partition_key.clone(), table_id) self.get(partition_key.clone(), table_id, transition_shard_id)
}) })
.await .await
.expect("retry forever"); .expect("retry forever");
@ -87,6 +89,7 @@ impl PartitionProvider for CatalogPartitionResolver {
table_id, table_id,
table_name, table_name,
SortKeyState::Provided(p.sort_key()), SortKeyState::Provided(p.sort_key()),
transition_shard_id,
) )
} }
} }
@ -99,7 +102,6 @@ mod tests {
use data_types::ShardIndex; use data_types::ShardIndex;
use super::*; use super::*;
use crate::TRANSITION_SHARD_ID;
const TABLE_NAME: &str = "bananas"; const TABLE_NAME: &str = "bananas";
const NAMESPACE_NAME: &str = "ns-bananas"; const NAMESPACE_NAME: &str = "ns-bananas";
@ -111,7 +113,7 @@ mod tests {
let catalog: Arc<dyn Catalog> = let catalog: Arc<dyn Catalog> =
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics))); Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
let (_shard_id, namespace_id, table_id) = { let (shard_id, namespace_id, table_id) = {
let mut repos = catalog.repositories().await; let mut repos = catalog.repositories().await;
let t = repos.topics().create_or_get("platanos").await.unwrap(); let t = repos.topics().create_or_get("platanos").await.unwrap();
let q = repos.query_pools().create_or_get("platanos").await.unwrap(); let q = repos.query_pools().create_or_get("platanos").await.unwrap();
@ -126,7 +128,6 @@ mod tests {
.create_or_get(&t, ShardIndex::new(0)) .create_or_get(&t, ShardIndex::new(0))
.await .await
.unwrap(); .unwrap();
assert_eq!(shard.id, TRANSITION_SHARD_ID);
let table = repos let table = repos
.tables() .tables()
@ -151,6 +152,7 @@ mod tests {
Arc::new(DeferredLoad::new(Duration::from_secs(1), async { Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
shard_id,
) )
.await; .await;

View File

@ -3,7 +3,7 @@
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait; use async_trait::async_trait;
use data_types::{NamespaceId, PartitionKey, TableId}; use data_types::{NamespaceId, PartitionKey, ShardId, TableId};
use parking_lot::Mutex; use parking_lot::Mutex;
use super::r#trait::PartitionProvider; use super::r#trait::PartitionProvider;
@ -54,6 +54,7 @@ impl PartitionProvider for MockPartitionProvider {
namespace_name: Arc<DeferredLoad<NamespaceName>>, namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId, table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>, table_name: Arc<DeferredLoad<TableName>>,
_transition_shard_id: ShardId,
) -> PartitionData { ) -> PartitionData {
let p = self let p = self
.partitions .partitions

View File

@ -1,7 +1,7 @@
use std::{fmt::Debug, sync::Arc}; use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait; use async_trait::async_trait;
use data_types::{NamespaceId, PartitionKey, TableId}; use data_types::{NamespaceId, PartitionKey, ShardId, TableId};
use crate::{ use crate::{
buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableName}, buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableName},
@ -24,6 +24,7 @@ pub(crate) trait PartitionProvider: Send + Sync + Debug {
namespace_name: Arc<DeferredLoad<NamespaceName>>, namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId, table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>, table_name: Arc<DeferredLoad<TableName>>,
transition_shard_id: ShardId,
) -> PartitionData; ) -> PartitionData;
} }
@ -39,6 +40,7 @@ where
namespace_name: Arc<DeferredLoad<NamespaceName>>, namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId, table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>, table_name: Arc<DeferredLoad<TableName>>,
transition_shard_id: ShardId,
) -> PartitionData { ) -> PartitionData {
(**self) (**self)
.get_partition( .get_partition(
@ -47,6 +49,7 @@ where
namespace_name, namespace_name,
table_id, table_id,
table_name, table_name,
transition_shard_id,
) )
.await .await
} }
@ -56,11 +59,13 @@ where
mod tests { mod tests {
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use data_types::PartitionId; use data_types::{PartitionId, ShardId};
use super::*; use super::*;
use crate::buffer_tree::partition::{resolver::mock::MockPartitionProvider, SortKeyState}; use crate::buffer_tree::partition::{resolver::mock::MockPartitionProvider, SortKeyState};
const TRANSITION_SHARD_ID: ShardId = ShardId::new(84);
#[tokio::test] #[tokio::test]
async fn test_arc_impl() { async fn test_arc_impl() {
let key = PartitionKey::from("bananas"); let key = PartitionKey::from("bananas");
@ -81,6 +86,7 @@ mod tests {
table_id, table_id,
Arc::clone(&table_name), Arc::clone(&table_name),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
); );
let mock = Arc::new(MockPartitionProvider::default().with_partition(data)); let mock = Arc::new(MockPartitionProvider::default().with_partition(data));
@ -92,6 +98,7 @@ mod tests {
Arc::clone(&namespace_name), Arc::clone(&namespace_name),
table_id, table_id,
Arc::clone(&table_name), Arc::clone(&table_name),
TRANSITION_SHARD_ID,
) )
.await; .await;
assert_eq!(got.partition_id(), partition); assert_eq!(got.partition_id(), partition);

View File

@ -1,7 +1,7 @@
use std::{fmt::Debug, sync::Arc}; use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait; use async_trait::async_trait;
use data_types::{NamespaceId, TableId}; use data_types::{NamespaceId, ShardId, TableId};
use dml::DmlOperation; use dml::DmlOperation;
use metric::U64Counter; use metric::U64Counter;
use parking_lot::Mutex; use parking_lot::Mutex;
@ -97,6 +97,7 @@ pub(crate) struct BufferTree<O> {
namespace_count: U64Counter, namespace_count: U64Counter,
post_write_observer: Arc<O>, post_write_observer: Arc<O>,
transition_shard_id: ShardId,
} }
impl<O> BufferTree<O> impl<O> BufferTree<O>
@ -110,6 +111,7 @@ where
partition_provider: Arc<dyn PartitionProvider>, partition_provider: Arc<dyn PartitionProvider>,
post_write_observer: Arc<O>, post_write_observer: Arc<O>,
metrics: Arc<metric::Registry>, metrics: Arc<metric::Registry>,
transition_shard_id: ShardId,
) -> Self { ) -> Self {
let namespace_count = metrics let namespace_count = metrics
.register_metric::<U64Counter>( .register_metric::<U64Counter>(
@ -126,6 +128,7 @@ where
partition_provider, partition_provider,
post_write_observer, post_write_observer,
namespace_count, namespace_count,
transition_shard_id,
} }
} }
@ -176,6 +179,7 @@ where
Arc::clone(&self.partition_provider), Arc::clone(&self.partition_provider),
Arc::clone(&self.post_write_observer), Arc::clone(&self.post_write_observer),
&self.metrics, &self.metrics,
self.transition_shard_id,
)) ))
}); });
@ -239,6 +243,7 @@ mod tests {
const TABLE_NAME: &str = "bananas"; const TABLE_NAME: &str = "bananas";
const NAMESPACE_NAME: &str = "platanos"; const NAMESPACE_NAME: &str = "platanos";
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
const TRANSITION_SHARD_ID: ShardId = ShardId::new(84);
#[tokio::test] #[tokio::test]
async fn test_namespace_init_table() { async fn test_namespace_init_table() {
@ -259,6 +264,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
), ),
)); ));
@ -270,6 +276,7 @@ mod tests {
partition_provider, partition_provider,
Arc::new(MockPostWriteObserver::default()), Arc::new(MockPostWriteObserver::default()),
&metrics, &metrics,
TRANSITION_SHARD_ID,
); );
// Assert the namespace name was stored // Assert the namespace name was stored
@ -339,6 +346,7 @@ mod tests {
partition_provider, partition_provider,
Arc::new(MockPostWriteObserver::default()), Arc::new(MockPostWriteObserver::default()),
Arc::new(metric::Registry::default()), Arc::new(metric::Registry::default()),
TRANSITION_SHARD_ID,
); );
// Write the provided DmlWrites // Write the provided DmlWrites
@ -383,6 +391,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
)], )],
writes = [make_write_op( writes = [make_write_op(
&PartitionKey::from("p1"), &PartitionKey::from("p1"),
@ -418,6 +427,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
), ),
PartitionData::new( PartitionData::new(
PartitionId::new(1), PartitionId::new(1),
@ -431,6 +441,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
) )
], ],
writes = [ writes = [
@ -478,6 +489,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
), ),
PartitionData::new( PartitionData::new(
PartitionId::new(1), PartitionId::new(1),
@ -491,6 +503,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
) )
], ],
writes = [ writes = [
@ -537,6 +550,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
), ),
PartitionData::new( PartitionData::new(
PartitionId::new(1), PartitionId::new(1),
@ -550,6 +564,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
) )
], ],
writes = [ writes = [
@ -597,6 +612,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
)], )],
writes = [ writes = [
make_write_op( make_write_op(
@ -646,6 +662,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
)) ))
.with_partition(PartitionData::new( .with_partition(PartitionData::new(
PartitionId::new(0), PartitionId::new(0),
@ -659,6 +676,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
)), )),
); );
@ -671,6 +689,7 @@ mod tests {
partition_provider, partition_provider,
Arc::new(MockPostWriteObserver::default()), Arc::new(MockPostWriteObserver::default()),
Arc::clone(&metrics), Arc::clone(&metrics),
TRANSITION_SHARD_ID,
); );
// Write data to partition p1, in table "bananas". // Write data to partition p1, in table "bananas".
@ -740,6 +759,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
)) ))
.with_partition(PartitionData::new( .with_partition(PartitionData::new(
PartitionId::new(1), PartitionId::new(1),
@ -753,6 +773,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
)) ))
.with_partition(PartitionData::new( .with_partition(PartitionData::new(
PartitionId::new(2), PartitionId::new(2),
@ -766,6 +787,7 @@ mod tests {
TableName::from("another_table") TableName::from("another_table")
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
)), )),
); );
@ -776,6 +798,7 @@ mod tests {
partition_provider, partition_provider,
Arc::new(MockPostWriteObserver::default()), Arc::new(MockPostWriteObserver::default()),
Arc::clone(&Arc::new(metric::Registry::default())), Arc::clone(&Arc::new(metric::Registry::default())),
TRANSITION_SHARD_ID,
); );
assert_eq!(buf.partitions().count(), 0); assert_eq!(buf.partitions().count(), 0);
@ -849,6 +872,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
), ),
)); ));
@ -859,6 +883,7 @@ mod tests {
partition_provider, partition_provider,
Arc::new(MockPostWriteObserver::default()), Arc::new(MockPostWriteObserver::default()),
Arc::new(metric::Registry::default()), Arc::new(metric::Registry::default()),
TRANSITION_SHARD_ID,
); );
// Query the empty tree // Query the empty tree
@ -932,6 +957,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
)) ))
.with_partition(PartitionData::new( .with_partition(PartitionData::new(
PartitionId::new(1), PartitionId::new(1),
@ -945,6 +971,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
)), )),
); );
@ -955,6 +982,7 @@ mod tests {
partition_provider, partition_provider,
Arc::new(MockPostWriteObserver::default()), Arc::new(MockPostWriteObserver::default()),
Arc::new(metric::Registry::default()), Arc::new(metric::Registry::default()),
TRANSITION_SHARD_ID,
); );
// Write data to partition p1, in table "bananas". // Write data to partition p1, in table "bananas".
@ -1013,10 +1041,11 @@ mod tests {
let partition = partitions.pop().unwrap(); let partition = partitions.pop().unwrap();
// Perform the partition read // Perform the partition read
let batches = let batches = datafusion::physical_plan::common::collect(
datafusion::physical_plan::common::collect(partition.into_record_batch_stream()) partition.into_record_batch_stream().unwrap(),
.await )
.expect("failed to collate query results"); .await
.expect("failed to collate query results");
// Assert the contents of p1 contains both the initial write, and the // Assert the contents of p1 contains both the initial write, and the
// 3rd write in a single RecordBatch. // 3rd write in a single RecordBatch.

View File

@ -5,7 +5,7 @@ pub(crate) mod name_resolver;
use std::{fmt::Debug, sync::Arc}; use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait; use async_trait::async_trait;
use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, TableId}; use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId};
use datafusion_util::MemoryStream; use datafusion_util::MemoryStream;
use mutable_batch::MutableBatch; use mutable_batch::MutableBatch;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
@ -116,6 +116,7 @@ pub(crate) struct TableData<O> {
partition_data: RwLock<DoubleRef>, partition_data: RwLock<DoubleRef>,
post_write_observer: Arc<O>, post_write_observer: Arc<O>,
transition_shard_id: ShardId,
} }
impl<O> TableData<O> { impl<O> TableData<O> {
@ -136,6 +137,7 @@ impl<O> TableData<O> {
namespace_name: Arc<DeferredLoad<NamespaceName>>, namespace_name: Arc<DeferredLoad<NamespaceName>>,
partition_provider: Arc<dyn PartitionProvider>, partition_provider: Arc<dyn PartitionProvider>,
post_write_observer: Arc<O>, post_write_observer: Arc<O>,
transition_shard_id: ShardId,
) -> Self { ) -> Self {
Self { Self {
table_id, table_id,
@ -145,6 +147,7 @@ impl<O> TableData<O> {
partition_data: Default::default(), partition_data: Default::default(),
partition_provider, partition_provider,
post_write_observer, post_write_observer,
transition_shard_id,
} }
} }
@ -218,6 +221,7 @@ where
Arc::clone(&self.namespace_name), Arc::clone(&self.namespace_name),
self.table_id, self.table_id,
Arc::clone(&self.table_name), Arc::clone(&self.table_name),
self.transition_shard_id,
) )
.await; .await;
// Add the double-referenced partition to the map. // Add the double-referenced partition to the map.
@ -263,7 +267,7 @@ where
); );
// Gather the partition data from all of the partitions in this table. // Gather the partition data from all of the partitions in this table.
let partitions = self.partitions().into_iter().filter_map(move |p| { let partitions = self.partitions().into_iter().map(move |p| {
let mut span = SpanRecorder::new(span.clone().map(|s| s.child("partition read"))); let mut span = SpanRecorder::new(span.clone().map(|s| s.child("partition read")));
let (id, completed_persistence_count, data) = { let (id, completed_persistence_count, data) = {
@ -271,30 +275,32 @@ where
( (
p.partition_id(), p.partition_id(),
p.completed_persistence_count(), p.completed_persistence_count(),
p.get_query_data()?, p.get_query_data(),
) )
}; };
assert_eq!(id, data.partition_id());
// Project the data if necessary let ret = match data {
let columns = columns.iter().map(String::as_str).collect::<Vec<_>>(); Some(data) => {
let selection = if columns.is_empty() { assert_eq!(id, data.partition_id());
Projection::All
} else { // Project the data if necessary
Projection::Some(columns.as_ref()) let columns = columns.iter().map(String::as_str).collect::<Vec<_>>();
let selection = if columns.is_empty() {
Projection::All
} else {
Projection::Some(columns.as_ref())
};
let data = Box::pin(MemoryStream::new(
data.project_selection(selection).into_iter().collect(),
));
PartitionResponse::new(data, id, None, completed_persistence_count)
}
None => PartitionResponse::new_no_batches(id, None, completed_persistence_count),
}; };
let ret = PartitionResponse::new(
Box::pin(MemoryStream::new(
data.project_selection(selection).into_iter().collect(),
)),
id,
None,
completed_persistence_count,
);
span.ok("read partition data"); span.ok("read partition data");
Some(ret) ret
}); });
Ok(PartitionStream::new(futures::stream::iter(partitions))) Ok(PartitionStream::new(futures::stream::iter(partitions)))
@ -319,6 +325,7 @@ mod tests {
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
const PARTITION_KEY: &str = "platanos"; const PARTITION_KEY: &str = "platanos";
const PARTITION_ID: PartitionId = PartitionId::new(0); const PARTITION_ID: PartitionId = PartitionId::new(0);
const TRANSITION_SHARD_ID: ShardId = ShardId::new(84);
#[tokio::test] #[tokio::test]
async fn test_partition_double_ref() { async fn test_partition_double_ref() {
@ -337,6 +344,7 @@ mod tests {
TableName::from(TABLE_NAME) TableName::from(TABLE_NAME)
})), })),
SortKeyState::Provided(None), SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
), ),
)); ));
@ -351,6 +359,7 @@ mod tests {
})), })),
partition_provider, partition_provider,
Arc::new(MockPostWriteObserver::default()), Arc::new(MockPostWriteObserver::default()),
TRANSITION_SHARD_ID,
); );
let batch = lines_to_batches(r#"bananas,bat=man value=24 42"#, 0) let batch = lines_to_batches(r#"bananas,bat=man value=24 42"#, 0)

View File

@ -27,7 +27,7 @@ use crate::{
server::grpc::GrpcDelegate, server::grpc::GrpcDelegate,
timestamp_oracle::TimestampOracle, timestamp_oracle::TimestampOracle,
wal::{rotate_task::periodic_rotation, wal_sink::WalSink}, wal::{rotate_task::periodic_rotation, wal_sink::WalSink},
TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX, TRANSITION_SHARD_INDEX,
}; };
/// Acquire opaque handles to the Ingester RPC service implementations. /// Acquire opaque handles to the Ingester RPC service implementations.
@ -168,7 +168,8 @@ pub async fn new(
.create_or_get("iox-shared") .create_or_get("iox-shared")
.await .await
.expect("get topic"); .expect("get topic");
txn.shards() let transition_shard = txn
.shards()
.create_or_get(&topic, TRANSITION_SHARD_INDEX) .create_or_get(&topic, TRANSITION_SHARD_INDEX)
.await .await
.expect("create transition shard"); .expect("create transition shard");
@ -199,7 +200,7 @@ pub async fn new(
.repositories() .repositories()
.await .await
.partitions() .partitions()
.most_recent_n(40_000, &[TRANSITION_SHARD_ID]) .most_recent_n(40_000, &[transition_shard.id])
.await .await
.map_err(InitError::PreWarmPartitions)?; .map_err(InitError::PreWarmPartitions)?;
@ -242,6 +243,7 @@ pub async fn new(
partition_provider, partition_provider,
Arc::new(hot_partition_persister), Arc::new(hot_partition_persister),
Arc::clone(&metrics), Arc::clone(&metrics),
transition_shard.id,
)); ));
// Initialise the WAL // Initialise the WAL

View File

@ -44,7 +44,7 @@
missing_docs missing_docs
)] )]
use data_types::{ShardId, ShardIndex}; use data_types::ShardIndex;
/// A macro to conditionally prepend `pub` to the inner tokens for benchmarking /// A macro to conditionally prepend `pub` to the inner tokens for benchmarking
/// purposes, should the `benches` feature be enabled. /// purposes, should the `benches` feature be enabled.
@ -60,9 +60,9 @@ macro_rules! maybe_pub {
}; };
} }
/// During the testing of ingester2, the catalog will require a ShardId for /// During the testing of ingester2, the catalog will require a ShardIndex for
/// various operations. This is a const value for these occasions. /// various operations. This is a const value for these occasions. Look up the ShardId for this
const TRANSITION_SHARD_ID: ShardId = ShardId::new(1); /// ShardIndex when needed.
const TRANSITION_SHARD_INDEX: ShardIndex = ShardIndex::new(1); const TRANSITION_SHARD_INDEX: ShardIndex = ShardIndex::new(1);
/// Ingester initialisation methods & types. /// Ingester initialisation methods & types.

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use backoff::Backoff; use backoff::Backoff;
use data_types::{ use data_types::{
CompactionLevel, NamespaceId, ParquetFileParams, PartitionId, PartitionKey, SequenceNumber, CompactionLevel, NamespaceId, ParquetFileParams, PartitionId, PartitionKey, SequenceNumber,
TableId, ShardId, TableId,
}; };
use iox_catalog::interface::get_table_schema_by_id; use iox_catalog::interface::get_table_schema_by_id;
use iox_time::{SystemProvider, TimeProvider}; use iox_time::{SystemProvider, TimeProvider};
@ -25,7 +25,6 @@ use crate::{
}, },
deferred_load::DeferredLoad, deferred_load::DeferredLoad,
persist::compact::{compact_persisting_batch, CompactedStream}, persist::compact::{compact_persisting_batch, CompactedStream},
TRANSITION_SHARD_ID,
}; };
use super::handle::Inner; use super::handle::Inner;
@ -79,6 +78,8 @@ pub(super) struct Context {
table_id: TableId, table_id: TableId,
partition_id: PartitionId, partition_id: PartitionId,
transition_shard_id: ShardId,
// The partition key for this partition // The partition key for this partition
partition_key: PartitionKey, partition_key: PartitionKey,
@ -162,6 +163,7 @@ impl Context {
enqueued_at, enqueued_at,
dequeued_at: Instant::now(), dequeued_at: Instant::now(),
permit, permit,
transition_shard_id: guard.transition_shard_id(),
} }
}; };
@ -233,7 +235,7 @@ impl Context {
let iox_metadata = IoxMetadata { let iox_metadata = IoxMetadata {
object_store_id, object_store_id,
creation_timestamp: SystemProvider::new().now(), creation_timestamp: SystemProvider::new().now(),
shard_id: TRANSITION_SHARD_ID, shard_id: self.transition_shard_id,
namespace_id: self.namespace_id, namespace_id: self.namespace_id,
namespace_name: Arc::clone(&*self.namespace_name.get().await), namespace_name: Arc::clone(&*self.namespace_name.get().await),
table_id: self.table_id, table_id: self.table_id,

View File

@ -430,7 +430,7 @@ mod tests {
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches; use assert_matches::assert_matches;
use data_types::{NamespaceId, PartitionId, PartitionKey, TableId}; use data_types::{NamespaceId, PartitionId, PartitionKey, ShardId, TableId};
use dml::DmlOperation; use dml::DmlOperation;
use iox_catalog::mem::MemCatalog; use iox_catalog::mem::MemCatalog;
use lazy_static::lazy_static; use lazy_static::lazy_static;
@ -460,6 +460,7 @@ mod tests {
const TABLE_ID: TableId = TableId::new(2442); const TABLE_ID: TableId = TableId::new(2442);
const TABLE_NAME: &str = "banana-report"; const TABLE_NAME: &str = "banana-report";
const NAMESPACE_NAME: &str = "platanos"; const NAMESPACE_NAME: &str = "platanos";
const TRANSITION_SHARD_ID: ShardId = ShardId::new(84);
lazy_static! { lazy_static! {
static ref EXEC: Arc<Executor> = Arc::new(Executor::new_testing()); static ref EXEC: Arc<Executor> = Arc::new(Executor::new_testing());
@ -492,10 +493,12 @@ mod tests {
TABLE_ID, TABLE_ID,
Arc::clone(&TABLE_NAME_LOADER), Arc::clone(&TABLE_NAME_LOADER),
sort_key, sort_key,
TRANSITION_SHARD_ID,
)), )),
), ),
Arc::new(MockPostWriteObserver::default()), Arc::new(MockPostWriteObserver::default()),
Default::default(), Default::default(),
TRANSITION_SHARD_ID,
); );
buffer_tree buffer_tree

View File

@ -8,7 +8,7 @@ use datafusion::physical_plan::SendableRecordBatchStream;
/// Response data for a single partition. /// Response data for a single partition.
pub(crate) struct PartitionResponse { pub(crate) struct PartitionResponse {
/// Stream of snapshots. /// Stream of snapshots.
batches: SendableRecordBatchStream, batches: Option<SendableRecordBatchStream>,
/// Partition ID. /// Partition ID.
id: PartitionId, id: PartitionId,
@ -42,7 +42,20 @@ impl PartitionResponse {
completed_persistence_count: u64, completed_persistence_count: u64,
) -> Self { ) -> Self {
Self { Self {
batches, batches: Some(batches),
id,
max_persisted_sequence_number,
completed_persistence_count,
}
}
pub(crate) fn new_no_batches(
id: PartitionId,
max_persisted_sequence_number: Option<SequenceNumber>,
completed_persistence_count: u64,
) -> Self {
Self {
batches: None,
id, id,
max_persisted_sequence_number, max_persisted_sequence_number,
completed_persistence_count, completed_persistence_count,
@ -61,7 +74,7 @@ impl PartitionResponse {
self.completed_persistence_count self.completed_persistence_count
} }
pub(crate) fn into_record_batch_stream(self) -> SendableRecordBatchStream { pub(crate) fn into_record_batch_stream(self) -> Option<SendableRecordBatchStream> {
self.batches self.batches
} }
} }

View File

@ -51,6 +51,7 @@ impl QueryResponse {
/// Reduce the [`QueryResponse`] to a stream of [`RecordBatch`]. /// Reduce the [`QueryResponse`] to a stream of [`RecordBatch`].
pub(crate) fn into_record_batches(self) -> impl Stream<Item = Result<RecordBatch, ArrowError>> { pub(crate) fn into_record_batches(self) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
self.into_partition_stream() self.into_partition_stream()
.flat_map(|partition| partition.into_record_batch_stream()) .flat_map(|partition| futures::stream::iter(partition.into_record_batch_stream()))
.flatten()
} }
} }

View File

@ -322,21 +322,25 @@ impl From<QueryResponse> for FlatIngesterQueryResponseStream {
completed_persistence_count, completed_persistence_count,
}) })
}); });
let tail = partition
.into_record_batch_stream()
.flat_map(|snapshot_res| match snapshot_res {
Ok(snapshot) => {
let schema = Arc::new(prepare_schema_for_flight(&snapshot.schema()));
let schema_captured = Arc::clone(&schema); match partition.into_record_batch_stream() {
let head = futures::stream::once(async { Some(stream) => {
Ok(FlatIngesterQueryResponse::StartSnapshot { let tail = stream.flat_map(|snapshot_res| match snapshot_res {
schema: schema_captured, Ok(snapshot) => {
}) let schema =
}); Arc::new(prepare_schema_for_flight(&snapshot.schema()));
let tail = let schema_captured = Arc::clone(&schema);
match prepare_batch_for_flight(&snapshot, Arc::clone(&schema)) { let head = futures::stream::once(async {
Ok(FlatIngesterQueryResponse::StartSnapshot {
schema: schema_captured,
})
});
let tail = match prepare_batch_for_flight(
&snapshot,
Arc::clone(&schema),
) {
Ok(batch) => { Ok(batch) => {
futures::stream::iter(split_batch_for_grpc_response(batch)) futures::stream::iter(split_batch_for_grpc_response(batch))
.map(|batch| { .map(|batch| {
@ -347,12 +351,15 @@ impl From<QueryResponse> for FlatIngesterQueryResponseStream {
Err(e) => futures::stream::once(async { Err(e) }).boxed(), Err(e) => futures::stream::once(async { Err(e) }).boxed(),
}; };
head.chain(tail).boxed() head.chain(tail).boxed()
} }
Err(e) => futures::stream::once(async { Err(e) }).boxed(), Err(e) => futures::stream::once(async { Err(e) }).boxed(),
}); });
head.chain(tail).boxed() head.chain(tail).boxed()
}
None => head.boxed(),
}
}) })
.boxed() .boxed()
} }

View File

@ -78,8 +78,6 @@ pub(crate) async fn populate_catalog(
.unwrap() .unwrap()
.id; .id;
assert_eq!(shard_id, crate::TRANSITION_SHARD_ID);
(shard_id, ns_id, table_id) (shard_id, ns_id, table_id)
} }