Merge pull request #4573 from influxdata/cn/write-info-grpc-more
feat: Querier can get write status from ingesterspull/24376/head
commit
1bf519c2f6
|
@ -2036,6 +2036,20 @@ impl TimestampMinMax {
|
|||
}
|
||||
}
|
||||
|
||||
/// Specifies the status of data in the ingestion process.
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub enum KafkaPartitionWriteStatus {
|
||||
/// Nothing is known about this write (e.g. it refers to a kafka
|
||||
/// partition for which we have no information)
|
||||
KafkaPartitionUnknown,
|
||||
/// The data has not yet been processed by the ingester, and thus is unreadable
|
||||
Durable,
|
||||
/// The data is readable, but not yet persisted
|
||||
Readable,
|
||||
/// The data is both readable and persisted to parquet
|
||||
Persisted,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
|
@ -215,6 +215,8 @@ pub mod google;
|
|||
pub mod delete_predicate;
|
||||
#[cfg(any(feature = "data_types_conversions", test))]
|
||||
pub mod ingester;
|
||||
#[cfg(any(feature = "data_types_conversions", test))]
|
||||
pub mod write_info;
|
||||
|
||||
pub use prost::{DecodeError, EncodeError};
|
||||
|
||||
|
|
|
@ -0,0 +1,161 @@
|
|||
use crate::influxdata::iox::ingester::v1 as proto;
|
||||
use data_types::KafkaPartitionWriteStatus;
|
||||
use std::collections::HashMap;
|
||||
|
||||
impl From<KafkaPartitionWriteStatus> for proto::KafkaPartitionStatus {
|
||||
fn from(status: KafkaPartitionWriteStatus) -> Self {
|
||||
match status {
|
||||
KafkaPartitionWriteStatus::KafkaPartitionUnknown => Self::Unknown,
|
||||
KafkaPartitionWriteStatus::Durable => Self::Durable,
|
||||
KafkaPartitionWriteStatus::Readable => Self::Readable,
|
||||
KafkaPartitionWriteStatus::Persisted => Self::Persisted,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl proto::KafkaPartitionStatus {
|
||||
/// Convert the status to a number such that higher numbers are later in the data lifecycle.
|
||||
/// For use in merging multiple write status gRPC responses into one response.
|
||||
fn status_order(&self) -> u8 {
|
||||
match self {
|
||||
Self::Unspecified => panic!("Unspecified status"),
|
||||
Self::Unknown => 0,
|
||||
Self::Durable => 1,
|
||||
Self::Readable => 2,
|
||||
Self::Persisted => 3,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl proto::KafkaPartitionInfo {
|
||||
fn merge(&mut self, other: &Self) {
|
||||
let self_status = self.status();
|
||||
let other_status = other.status();
|
||||
|
||||
let new_status = match self_status.status_order().cmp(&other_status.status_order()) {
|
||||
std::cmp::Ordering::Less => other_status,
|
||||
std::cmp::Ordering::Equal => self_status,
|
||||
std::cmp::Ordering::Greater => self_status,
|
||||
};
|
||||
|
||||
self.set_status(new_status);
|
||||
}
|
||||
}
|
||||
|
||||
/// "Merges" the partition information for write info responses so that the "most recent"
|
||||
/// information is returned.
|
||||
pub fn merge_responses(
|
||||
responses: impl IntoIterator<Item = proto::GetWriteInfoResponse>,
|
||||
) -> proto::GetWriteInfoResponse {
|
||||
// Map kafka partition id to status
|
||||
let mut partition_infos: HashMap<_, proto::KafkaPartitionInfo> = HashMap::new();
|
||||
|
||||
responses
|
||||
.into_iter()
|
||||
.flat_map(|res| res.kafka_partition_infos.into_iter())
|
||||
.for_each(|info| {
|
||||
partition_infos
|
||||
.entry(info.kafka_partition_id)
|
||||
.and_modify(|existing_info| existing_info.merge(&info))
|
||||
.or_insert(info);
|
||||
});
|
||||
|
||||
let kafka_partition_infos = partition_infos
|
||||
.into_iter()
|
||||
.map(|(_kafka_partition_id, info)| info)
|
||||
.collect();
|
||||
|
||||
proto::GetWriteInfoResponse {
|
||||
kafka_partition_infos,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use proto::{KafkaPartitionInfo, KafkaPartitionStatus};
|
||||
|
||||
#[test]
|
||||
fn test_merge() {
|
||||
#[derive(Debug)]
|
||||
struct Test<'a> {
|
||||
left: &'a KafkaPartitionInfo,
|
||||
right: &'a KafkaPartitionInfo,
|
||||
expected: &'a KafkaPartitionInfo,
|
||||
}
|
||||
|
||||
let durable = KafkaPartitionInfo {
|
||||
kafka_partition_id: 1,
|
||||
status: KafkaPartitionStatus::Durable.into(),
|
||||
};
|
||||
|
||||
let readable = KafkaPartitionInfo {
|
||||
kafka_partition_id: 1,
|
||||
status: KafkaPartitionStatus::Readable.into(),
|
||||
};
|
||||
|
||||
let persisted = KafkaPartitionInfo {
|
||||
kafka_partition_id: 1,
|
||||
status: KafkaPartitionStatus::Persisted.into(),
|
||||
};
|
||||
|
||||
let unknown = KafkaPartitionInfo {
|
||||
kafka_partition_id: 1,
|
||||
status: KafkaPartitionStatus::Unknown.into(),
|
||||
};
|
||||
|
||||
let tests = vec![
|
||||
Test {
|
||||
left: &unknown,
|
||||
right: &unknown,
|
||||
expected: &unknown,
|
||||
},
|
||||
Test {
|
||||
left: &unknown,
|
||||
right: &durable,
|
||||
expected: &durable,
|
||||
},
|
||||
Test {
|
||||
left: &unknown,
|
||||
right: &readable,
|
||||
expected: &readable,
|
||||
},
|
||||
Test {
|
||||
left: &durable,
|
||||
right: &unknown,
|
||||
expected: &durable,
|
||||
},
|
||||
Test {
|
||||
left: &readable,
|
||||
right: &readable,
|
||||
expected: &readable,
|
||||
},
|
||||
Test {
|
||||
left: &durable,
|
||||
right: &durable,
|
||||
expected: &durable,
|
||||
},
|
||||
Test {
|
||||
left: &readable,
|
||||
right: &durable,
|
||||
expected: &readable,
|
||||
},
|
||||
Test {
|
||||
left: &persisted,
|
||||
right: &durable,
|
||||
expected: &persisted,
|
||||
},
|
||||
];
|
||||
|
||||
for test in tests {
|
||||
let mut output = test.left.clone();
|
||||
|
||||
output.merge(test.right);
|
||||
assert_eq!(
|
||||
&output, test.expected,
|
||||
"Mismatch\n\nOutput:\n{:#?}\n\nTest:\n{:#?}",
|
||||
output, test
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -29,7 +29,7 @@ async fn smoke() {
|
|||
// wait for data to be persisted to parquet
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
let write_token = get_write_token(&response);
|
||||
wait_for_persisted(write_token, all_in_one.ingester_grpc_connection()).await;
|
||||
wait_for_persisted(write_token, all_in_one.querier_grpc_connection()).await;
|
||||
|
||||
// run query
|
||||
let sql = format!("select * from {}", table_name);
|
||||
|
|
|
@ -103,8 +103,9 @@ async fn basic_no_ingester_connection() {
|
|||
&mut cluster,
|
||||
vec![
|
||||
Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)),
|
||||
// Wait for data to be persisted to parquet
|
||||
Step::WaitForPersisted,
|
||||
// Wait for data to be persisted to parquet, ask the ingester directly because the
|
||||
// querier is not connected to the ingester
|
||||
Step::WaitForPersistedAccordingToIngester,
|
||||
Step::Query {
|
||||
sql: format!("select * from {}", table_name),
|
||||
expected: vec![
|
||||
|
|
|
@ -120,8 +120,8 @@ async fn basic_multi_ingesters() {
|
|||
StepTest::new(&mut cluster, test_steps).run().await
|
||||
}
|
||||
|
||||
/// Return a combined readable response from the ingesters that is
|
||||
/// readable for all partitions. Retries forever
|
||||
/// Use the WriteInfo API on the querier that will combine write info from all the ingesters it
|
||||
/// knows about to get the status of data
|
||||
async fn get_multi_ingester_readable_combined_response(
|
||||
state: &mut StepTestState<'_>,
|
||||
) -> GetWriteInfoResponse {
|
||||
|
|
|
@ -9,6 +9,7 @@ pub mod generated_types {
|
|||
write_info_service_client, write_info_service_server, GetWriteInfoRequest,
|
||||
GetWriteInfoResponse, KafkaPartitionInfo, KafkaPartitionStatus,
|
||||
};
|
||||
pub use generated_types::write_info::merge_responses;
|
||||
}
|
||||
|
||||
/// A basic client for fetching information about write tokens from a
|
||||
|
|
|
@ -25,7 +25,7 @@ use std::{pin::Pin, sync::Arc, task::Poll};
|
|||
use tokio::task::JoinHandle;
|
||||
use tonic::{Request, Response, Streaming};
|
||||
use trace::ctx::SpanContext;
|
||||
use write_summary::{KafkaPartitionWriteStatus, WriteSummary};
|
||||
use write_summary::WriteSummary;
|
||||
|
||||
/// This type is responsible for managing all gRPC services exposed by
|
||||
/// `ingester`.
|
||||
|
@ -67,15 +67,6 @@ impl WriteInfoServiceImpl {
|
|||
}
|
||||
}
|
||||
|
||||
fn to_proto_status(status: KafkaPartitionWriteStatus) -> proto::KafkaPartitionStatus {
|
||||
match status {
|
||||
KafkaPartitionWriteStatus::KafkaPartitionUnknown => proto::KafkaPartitionStatus::Unknown,
|
||||
KafkaPartitionWriteStatus::Durable => proto::KafkaPartitionStatus::Durable,
|
||||
KafkaPartitionWriteStatus::Readable => proto::KafkaPartitionStatus::Readable,
|
||||
KafkaPartitionWriteStatus::Persisted => proto::KafkaPartitionStatus::Persisted,
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl WriteInfoService for WriteInfoServiceImpl {
|
||||
async fn get_write_info(
|
||||
|
@ -101,7 +92,7 @@ impl WriteInfoService for WriteInfoServiceImpl {
|
|||
|
||||
Ok(proto::KafkaPartitionInfo {
|
||||
kafka_partition_id: kafka_partition_id.get(),
|
||||
status: to_proto_status(status).into(),
|
||||
status: proto::KafkaPartitionStatus::from(status).into(),
|
||||
})
|
||||
})
|
||||
.collect::<Result<Vec<_>, tonic::Status>>()?;
|
||||
|
|
|
@ -78,15 +78,19 @@ impl<C: QuerierHandler + std::fmt::Debug + 'static> ServerType for QuerierServer
|
|||
let builder = setup_builder!(builder_input, self);
|
||||
add_service!(
|
||||
builder,
|
||||
rpc::query::make_flight_server(Arc::clone(&self.database),)
|
||||
rpc::query::make_flight_server(Arc::clone(&self.database))
|
||||
);
|
||||
add_service!(
|
||||
builder,
|
||||
rpc::query::make_storage_server(Arc::clone(&self.database),)
|
||||
rpc::query::make_storage_server(Arc::clone(&self.database))
|
||||
);
|
||||
add_service!(
|
||||
builder,
|
||||
rpc::namespace::namespace_service(Arc::clone(&self.database),)
|
||||
rpc::namespace::namespace_service(Arc::clone(&self.database))
|
||||
);
|
||||
add_service!(
|
||||
builder,
|
||||
rpc::write_info::write_info_service(Arc::clone(&self.database))
|
||||
);
|
||||
add_service!(builder, self.server.handler().schema_service());
|
||||
serve_builder!(builder);
|
||||
|
|
|
@ -1,2 +1,3 @@
|
|||
pub(crate) mod namespace;
|
||||
pub(crate) mod query;
|
||||
pub(crate) mod write_info;
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
//! WriteInfoService gRPC implementation
|
||||
|
||||
use generated_types::influxdata::iox::ingester::v1::{
|
||||
self as proto,
|
||||
write_info_service_server::{WriteInfoService, WriteInfoServiceServer},
|
||||
};
|
||||
use querier::QuerierDatabase;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Acquire a [`WriteInfoService`] gRPC service implementation.
|
||||
pub fn write_info_service(
|
||||
server: Arc<QuerierDatabase>,
|
||||
) -> WriteInfoServiceServer<impl WriteInfoService> {
|
||||
WriteInfoServiceServer::new(QuerierWriteInfoServiceImpl::new(server))
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct QuerierWriteInfoServiceImpl {
|
||||
server: Arc<QuerierDatabase>,
|
||||
}
|
||||
|
||||
impl QuerierWriteInfoServiceImpl {
|
||||
pub fn new(server: Arc<QuerierDatabase>) -> Self {
|
||||
Self { server }
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl WriteInfoService for QuerierWriteInfoServiceImpl {
|
||||
async fn get_write_info(
|
||||
&self,
|
||||
request: tonic::Request<proto::GetWriteInfoRequest>,
|
||||
) -> Result<tonic::Response<proto::GetWriteInfoResponse>, tonic::Status> {
|
||||
let proto::GetWriteInfoRequest { write_token } = request.into_inner();
|
||||
|
||||
let progresses = self
|
||||
.server
|
||||
.get_write_info(&write_token)
|
||||
.await
|
||||
.map_err(|e| tonic::Status::internal(e.to_string()))?;
|
||||
|
||||
Ok(tonic::Response::new(progresses))
|
||||
}
|
||||
}
|
|
@ -1,12 +1,16 @@
|
|||
//! Database for the querier that contains all namespaces.
|
||||
|
||||
use crate::{
|
||||
cache::CatalogCache, chunk::ParquetChunkAdapter, ingester::IngesterConnection,
|
||||
namespace::QuerierNamespace, query_log::QueryLog,
|
||||
cache::CatalogCache,
|
||||
chunk::ParquetChunkAdapter,
|
||||
ingester::{Error, IngesterConnection},
|
||||
namespace::QuerierNamespace,
|
||||
query_log::QueryLog,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
use data_types::Namespace;
|
||||
use generated_types::influxdata::iox::ingester::v1::GetWriteInfoResponse;
|
||||
use object_store::DynObjectStore;
|
||||
use parking_lot::RwLock;
|
||||
use query::exec::Executor;
|
||||
|
@ -47,7 +51,7 @@ pub struct QuerierDatabase {
|
|||
/// Executor for queries.
|
||||
exec: Arc<Executor>,
|
||||
|
||||
/// Connection to ingester
|
||||
/// Connection to ingester(s)
|
||||
ingester_connection: Arc<dyn IngesterConnection>,
|
||||
|
||||
/// Query log.
|
||||
|
@ -123,6 +127,11 @@ impl QuerierDatabase {
|
|||
.expect("retry forever")
|
||||
}
|
||||
|
||||
/// Get write info for the given token, by proxing the request to the ingester
|
||||
pub async fn get_write_info(&self, write_token: &str) -> Result<GetWriteInfoResponse, Error> {
|
||||
self.ingester_connection.get_write_info(write_token).await
|
||||
}
|
||||
|
||||
/// Executor
|
||||
pub(crate) fn exec(&self) -> &Executor {
|
||||
&self.exec
|
||||
|
|
|
@ -5,13 +5,17 @@ use self::{
|
|||
use crate::cache::CatalogCache;
|
||||
use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch};
|
||||
use async_trait::async_trait;
|
||||
use client_util::connection;
|
||||
use data_types::{
|
||||
ChunkAddr, ChunkId, ChunkOrder, ColumnSummary, InfluxDbType, PartitionId, SequenceNumber,
|
||||
SequencerId, StatValues, Statistics, TableSummary, TimestampMinMax,
|
||||
};
|
||||
use datafusion_util::MemoryStream;
|
||||
use futures::{stream::FuturesUnordered, TryStreamExt};
|
||||
use generated_types::ingester::IngesterQueryRequest;
|
||||
use generated_types::{
|
||||
influxdata::iox::ingester::v1::GetWriteInfoResponse, ingester::IngesterQueryRequest,
|
||||
write_info::merge_responses,
|
||||
};
|
||||
use observability_deps::tracing::{debug, trace};
|
||||
use predicate::{Predicate, PredicateMatch};
|
||||
use query::{
|
||||
|
@ -29,19 +33,26 @@ pub(crate) mod test_util;
|
|||
#[derive(Debug, Snafu)]
|
||||
#[allow(missing_copy_implementations, missing_docs)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Failed to select columns: {}", source))]
|
||||
SelectColumns { source: schema::Error },
|
||||
|
||||
#[snafu(display("Internal error: ingester record batch for colum '{}' has type '{}' but should have type '{}'",
|
||||
column_name, actual_data_type, desired_data_type))]
|
||||
#[snafu(display(
|
||||
"Internal error: \
|
||||
ingester record batch for column '{}' has type '{}' but should have type '{}'",
|
||||
column_name,
|
||||
actual_data_type,
|
||||
desired_data_type
|
||||
))]
|
||||
RecordBatchType {
|
||||
column_name: String,
|
||||
actual_data_type: DataType,
|
||||
desired_data_type: DataType,
|
||||
},
|
||||
|
||||
#[snafu(display("Internal error: failed to resolve ingester record batch types for column '{}' type '{}': {}",
|
||||
column_name, data_type, source))]
|
||||
#[snafu(display(
|
||||
"Internal error: \
|
||||
failed to resolve ingester record batch types for column '{}' type '{}': {}",
|
||||
column_name,
|
||||
data_type,
|
||||
source
|
||||
))]
|
||||
ConvertingRecordBatch {
|
||||
column_name: String,
|
||||
data_type: DataType,
|
||||
|
@ -51,9 +62,6 @@ pub enum Error {
|
|||
#[snafu(display("Internal error creating record batch: {}", source))]
|
||||
CreatingRecordBatch { source: ArrowError },
|
||||
|
||||
#[snafu(display("Internal error creating IOx schema: {}", source))]
|
||||
CreatingSchema { source: schema::Error },
|
||||
|
||||
#[snafu(display("Failed ingester query '{}': {}", ingester_address, source))]
|
||||
RemoteQuery {
|
||||
ingester_address: String,
|
||||
|
@ -87,6 +95,24 @@ pub enum Error {
|
|||
partition_id: i64,
|
||||
ingester_address: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to connect to ingester '{}': {}", ingester_address, source))]
|
||||
Connecting {
|
||||
ingester_address: String,
|
||||
source: connection::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Error retrieving write info from '{}' for write token '{}': {}",
|
||||
ingester_address,
|
||||
write_token,
|
||||
source,
|
||||
))]
|
||||
WriteInfo {
|
||||
ingester_address: String,
|
||||
write_token: String,
|
||||
source: influxdb_iox_client::error::Error,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -122,6 +148,10 @@ pub trait IngesterConnection: std::fmt::Debug + Send + Sync + 'static {
|
|||
expected_schema: Arc<Schema>,
|
||||
) -> Result<Vec<Arc<IngesterPartition>>>;
|
||||
|
||||
/// Returns the most recent partition sstatus info across all ingester(s) for the specified
|
||||
/// write token.
|
||||
async fn get_write_info(&self, write_token: &str) -> Result<GetWriteInfoResponse>;
|
||||
|
||||
/// Return backend as [`Any`] which can be used to downcast to a specifc implementation.
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
}
|
||||
|
@ -333,11 +363,41 @@ impl IngesterConnection for IngesterConnectionImpl {
|
|||
Ok(ingester_partitions)
|
||||
}
|
||||
|
||||
async fn get_write_info(&self, write_token: &str) -> Result<GetWriteInfoResponse> {
|
||||
let responses = self
|
||||
.ingester_addresses
|
||||
.iter()
|
||||
.map(|ingester_address| execute_get_write_infos(ingester_address, write_token))
|
||||
.collect::<FuturesUnordered<_>>()
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
Ok(merge_responses(responses))
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self as &dyn Any
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_get_write_infos(
|
||||
ingester_address: &str,
|
||||
write_token: &str,
|
||||
) -> Result<GetWriteInfoResponse, Error> {
|
||||
let connection = connection::Builder::new()
|
||||
.build(ingester_address)
|
||||
.await
|
||||
.context(ConnectingSnafu { ingester_address })?;
|
||||
|
||||
influxdb_iox_client::write_info::Client::new(connection)
|
||||
.get_write_info(write_token)
|
||||
.await
|
||||
.context(WriteInfoSnafu {
|
||||
ingester_address,
|
||||
write_token,
|
||||
})
|
||||
}
|
||||
|
||||
/// A wrapper around the unpersisted data in a partition returned by
|
||||
/// the ingester that (will) implement the `QueryChunk` interface
|
||||
///
|
||||
|
@ -578,23 +638,24 @@ impl QueryChunk for IngesterPartition {
|
|||
/// 2. NULL-column creation
|
||||
///
|
||||
/// # Dictionary Type Recovery
|
||||
/// Cast arrays in record batch to be the type of schema this is a
|
||||
/// workaround for
|
||||
/// <https://github.com/influxdata/influxdb_iox/pull/4273> where the
|
||||
/// flight API doesn't necessarily return the same schema as was
|
||||
/// provided by the ingester.
|
||||
///
|
||||
/// Namely, dictionary encoded columns (e.g. tags) are returned as
|
||||
/// `DataType::Utf8` even when they were sent as
|
||||
/// `DataType::Dictionary(Int32, Utf8)`.
|
||||
/// Cast arrays in record batch to be the type of schema. This is a workaround for
|
||||
/// <https://github.com/influxdata/influxdb_iox/pull/4273> where the Flight API doesn't necessarily
|
||||
/// return the same schema as was provided by the ingester.
|
||||
///
|
||||
/// Namely, dictionary encoded columns (e.g. tags) are returned as `DataType::Utf8` even when they
|
||||
/// were sent as `DataType::Dictionary(Int32, Utf8)`.
|
||||
///
|
||||
/// # NULL-column Creation
|
||||
/// If a column is absent in an ingester partition it will not be part of record batch even when the querier requests
|
||||
/// it. In that case we create it as "all NULL" column with the appropriate type.
|
||||
///
|
||||
/// An alternative would be to remove the column from the schema of the appropriate [`IngesterPartition`]. However since
|
||||
/// a partition may contain multiple record batches and we do not want to assume that the presence/absence of columns is
|
||||
/// identical for all of them, we fix this here.
|
||||
/// If a column is absent in an ingester partition it will not be part of record batch even when
|
||||
/// the querier requests it. In that case we create it as "all NULL" column with the appropriate
|
||||
/// type.
|
||||
///
|
||||
/// An alternative would be to remove the column from the schema of the appropriate
|
||||
/// [`IngesterPartition`]. However, since a partition may contain multiple record batches and we do
|
||||
/// not want to assume that the presence/absence of columns is identical for all of them, we fix
|
||||
/// this here.
|
||||
fn ensure_schema(batch: RecordBatch, expected_schema: &Schema) -> Result<RecordBatch> {
|
||||
let actual_schema = batch.schema();
|
||||
let desired_fields = expected_schema.iter().map(|(_, f)| f);
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
use std::{any::Any, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use super::IngesterConnection;
|
||||
use async_trait::async_trait;
|
||||
use generated_types::influxdata::iox::ingester::v1::GetWriteInfoResponse;
|
||||
use parking_lot::Mutex;
|
||||
use std::{any::Any, sync::Arc};
|
||||
|
||||
/// IngesterConnection for testing
|
||||
#[derive(Debug, Default)]
|
||||
|
@ -40,6 +39,10 @@ impl IngesterConnection for MockIngesterConnection {
|
|||
.unwrap_or_else(|| Ok(vec![]))
|
||||
}
|
||||
|
||||
async fn get_write_info(&self, _write_token: &str) -> super::Result<GetWriteInfoResponse> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self as &dyn Any
|
||||
}
|
||||
|
|
|
@ -7,10 +7,10 @@ use influxdb_iox_client::{
|
|||
connection::Connection,
|
||||
flight::generated_types::ReadInfo,
|
||||
write::generated_types::{DatabaseBatch, TableBatch, WriteRequest, WriteResponse},
|
||||
write_info::generated_types::{GetWriteInfoResponse, KafkaPartitionInfo, KafkaPartitionStatus},
|
||||
write_info::generated_types::{merge_responses, GetWriteInfoResponse, KafkaPartitionStatus},
|
||||
};
|
||||
use observability_deps::tracing::info;
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
use std::time::Duration;
|
||||
|
||||
/// Writes the line protocol to the write_base/api/v2/write endpoint (typically on the router)
|
||||
pub async fn write_to_router(
|
||||
|
@ -83,12 +83,12 @@ pub fn get_write_token_from_grpc(response: &tonic::Response<WriteResponse>) -> S
|
|||
.to_string()
|
||||
}
|
||||
|
||||
/// returns the write info from ingester_connection for this token
|
||||
/// returns the write info from the connection (to either an ingester or a querier) for this token
|
||||
pub async fn token_info(
|
||||
write_token: impl AsRef<str>,
|
||||
ingester_connection: Connection,
|
||||
connection: Connection,
|
||||
) -> Result<GetWriteInfoResponse, influxdb_iox_client::error::Error> {
|
||||
influxdb_iox_client::write_info::Client::new(ingester_connection)
|
||||
influxdb_iox_client::write_info::Client::new(connection)
|
||||
.get_write_info(write_token.as_ref())
|
||||
.await
|
||||
}
|
||||
|
@ -138,11 +138,8 @@ pub async fn token_is_persisted(
|
|||
const MAX_QUERY_RETRY_TIME_SEC: u64 = 20;
|
||||
|
||||
/// Waits for the specified predicate to return true
|
||||
pub async fn wait_for_token<F>(
|
||||
write_token: impl Into<String>,
|
||||
ingester_connection: Connection,
|
||||
f: F,
|
||||
) where
|
||||
pub async fn wait_for_token<F>(write_token: impl Into<String>, connection: Connection, f: F)
|
||||
where
|
||||
F: Fn(&GetWriteInfoResponse) -> bool,
|
||||
{
|
||||
let write_token = write_token.into();
|
||||
|
@ -151,7 +148,7 @@ pub async fn wait_for_token<F>(
|
|||
info!(" write token: {}", write_token);
|
||||
|
||||
let retry_duration = Duration::from_secs(MAX_QUERY_RETRY_TIME_SEC);
|
||||
let mut write_info_client = influxdb_iox_client::write_info::Client::new(ingester_connection);
|
||||
let mut write_info_client = influxdb_iox_client::write_info::Client::new(connection);
|
||||
tokio::time::timeout(retry_duration, async move {
|
||||
let mut interval = tokio::time::interval(Duration::from_millis(500));
|
||||
loop {
|
||||
|
@ -175,10 +172,10 @@ pub async fn wait_for_token<F>(
|
|||
}
|
||||
|
||||
/// Waits for the specified write token to be readable
|
||||
pub async fn wait_for_readable(write_token: impl Into<String>, ingester_connection: Connection) {
|
||||
pub async fn wait_for_readable(write_token: impl Into<String>, connection: Connection) {
|
||||
info!("Waiting for write token to be readable");
|
||||
|
||||
wait_for_token(write_token, ingester_connection, |res| {
|
||||
wait_for_token(write_token, connection, |res| {
|
||||
if all_readable(res) {
|
||||
info!("Write is readable: {:?}", res);
|
||||
true
|
||||
|
@ -190,10 +187,10 @@ pub async fn wait_for_readable(write_token: impl Into<String>, ingester_connecti
|
|||
}
|
||||
|
||||
/// Waits for the write token to be persisted
|
||||
pub async fn wait_for_persisted(write_token: impl Into<String>, ingester_connection: Connection) {
|
||||
pub async fn wait_for_persisted(write_token: impl Into<String>, connection: Connection) {
|
||||
info!("Waiting for write token to be persisted");
|
||||
|
||||
wait_for_token(write_token, ingester_connection, |res| {
|
||||
wait_for_token(write_token, connection, |res| {
|
||||
if all_persisted(res) {
|
||||
info!("Write is persisted: {:?}", res);
|
||||
true
|
||||
|
@ -225,61 +222,6 @@ pub fn all_persisted(res: &GetWriteInfoResponse) -> bool {
|
|||
.all(|info| matches!(info.status(), KafkaPartitionStatus::Persisted))
|
||||
}
|
||||
|
||||
/// "merges" the partition information for write info responses so
|
||||
/// that the "most recent" information is returned
|
||||
fn merge_responses(
|
||||
responses: impl IntoIterator<Item = GetWriteInfoResponse>,
|
||||
) -> GetWriteInfoResponse {
|
||||
// make kafka partition id to status
|
||||
let mut partition_infos: HashMap<_, KafkaPartitionInfo> = HashMap::new();
|
||||
|
||||
responses
|
||||
.into_iter()
|
||||
.flat_map(|res| res.kafka_partition_infos.into_iter())
|
||||
.for_each(|info| {
|
||||
partition_infos
|
||||
.entry(info.kafka_partition_id)
|
||||
.and_modify(|existing_info| merge_info(existing_info, &info))
|
||||
.or_insert(info);
|
||||
});
|
||||
|
||||
let kafka_partition_infos = partition_infos
|
||||
.into_iter()
|
||||
.map(|(_kafka_partition_id, info)| info)
|
||||
.collect();
|
||||
|
||||
GetWriteInfoResponse {
|
||||
kafka_partition_infos,
|
||||
}
|
||||
}
|
||||
|
||||
// convert the status to a number such that higher numbers are later
|
||||
// in the data lifecycle
|
||||
fn status_order(status: KafkaPartitionStatus) -> u8 {
|
||||
match status {
|
||||
KafkaPartitionStatus::Unspecified => panic!("Unspecified status"),
|
||||
KafkaPartitionStatus::Unknown => 0,
|
||||
KafkaPartitionStatus::Durable => 1,
|
||||
KafkaPartitionStatus::Readable => 2,
|
||||
KafkaPartitionStatus::Persisted => 3,
|
||||
}
|
||||
}
|
||||
|
||||
fn merge_info(left: &mut KafkaPartitionInfo, right: &KafkaPartitionInfo) {
|
||||
info!("existing_info {:?}, info: {:?}", left, right);
|
||||
|
||||
let left_status = left.status();
|
||||
let right_status = right.status();
|
||||
|
||||
let new_status = match status_order(left_status).cmp(&status_order(right_status)) {
|
||||
std::cmp::Ordering::Less => right_status,
|
||||
std::cmp::Ordering::Equal => left_status,
|
||||
std::cmp::Ordering::Greater => left_status,
|
||||
};
|
||||
|
||||
left.set_status(new_status);
|
||||
}
|
||||
|
||||
/// Runs a query using the flight API on the specified connection
|
||||
pub async fn run_query(
|
||||
sql: impl Into<String>,
|
||||
|
@ -304,92 +246,3 @@ pub async fn run_query(
|
|||
|
||||
response.collect().await.expect("Error executing query")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_merge_info() {
|
||||
#[derive(Debug)]
|
||||
struct Test<'a> {
|
||||
left: &'a KafkaPartitionInfo,
|
||||
right: &'a KafkaPartitionInfo,
|
||||
expected: &'a KafkaPartitionInfo,
|
||||
}
|
||||
|
||||
let durable = KafkaPartitionInfo {
|
||||
kafka_partition_id: 1,
|
||||
status: KafkaPartitionStatus::Durable.into(),
|
||||
};
|
||||
|
||||
let readable = KafkaPartitionInfo {
|
||||
kafka_partition_id: 1,
|
||||
status: KafkaPartitionStatus::Readable.into(),
|
||||
};
|
||||
|
||||
let persisted = KafkaPartitionInfo {
|
||||
kafka_partition_id: 1,
|
||||
status: KafkaPartitionStatus::Persisted.into(),
|
||||
};
|
||||
|
||||
let unknown = KafkaPartitionInfo {
|
||||
kafka_partition_id: 1,
|
||||
status: KafkaPartitionStatus::Unknown.into(),
|
||||
};
|
||||
|
||||
let tests = vec![
|
||||
Test {
|
||||
left: &unknown,
|
||||
right: &unknown,
|
||||
expected: &unknown,
|
||||
},
|
||||
Test {
|
||||
left: &unknown,
|
||||
right: &durable,
|
||||
expected: &durable,
|
||||
},
|
||||
Test {
|
||||
left: &unknown,
|
||||
right: &readable,
|
||||
expected: &readable,
|
||||
},
|
||||
Test {
|
||||
left: &durable,
|
||||
right: &unknown,
|
||||
expected: &durable,
|
||||
},
|
||||
Test {
|
||||
left: &readable,
|
||||
right: &readable,
|
||||
expected: &readable,
|
||||
},
|
||||
Test {
|
||||
left: &durable,
|
||||
right: &durable,
|
||||
expected: &durable,
|
||||
},
|
||||
Test {
|
||||
left: &readable,
|
||||
right: &durable,
|
||||
expected: &readable,
|
||||
},
|
||||
Test {
|
||||
left: &persisted,
|
||||
right: &durable,
|
||||
expected: &persisted,
|
||||
},
|
||||
];
|
||||
|
||||
for test in tests {
|
||||
let mut output = test.left.clone();
|
||||
|
||||
merge_info(&mut output, test.right);
|
||||
assert_eq!(
|
||||
&output, test.expected,
|
||||
"Mismatch\n\nOutput:\n{:#?}\n\nTest:\n{:#?}",
|
||||
output, test
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -84,6 +84,10 @@ pub enum Step {
|
|||
/// Wait for all previously written data to be persisted
|
||||
WaitForPersisted,
|
||||
|
||||
/// Ask the ingester if it has persisted the data. For use in tests where the querier doesn't
|
||||
/// know about the ingester, so the test needs to ask the ingester directly.
|
||||
WaitForPersistedAccordingToIngester,
|
||||
|
||||
/// Run a query using the FlightSQL interface and verify that the
|
||||
/// results match the expected results using the
|
||||
/// `assert_batches_eq!` macro
|
||||
|
@ -154,14 +158,25 @@ impl<'a> StepTest<'a> {
|
|||
}
|
||||
Step::WaitForReadable => {
|
||||
info!("====Begin waiting for all write tokens to be readable");
|
||||
let ingester_grpc_connection =
|
||||
state.cluster().ingester().ingester_grpc_connection();
|
||||
let querier_grpc_connection =
|
||||
state.cluster().querier().querier_grpc_connection();
|
||||
for write_token in &state.write_tokens {
|
||||
wait_for_readable(write_token, ingester_grpc_connection.clone()).await;
|
||||
wait_for_readable(write_token, querier_grpc_connection.clone()).await;
|
||||
}
|
||||
info!("====Done waiting for all write tokens to be readable");
|
||||
}
|
||||
Step::WaitForPersisted => {
|
||||
info!("====Begin waiting for all write tokens to be persisted");
|
||||
let querier_grpc_connection =
|
||||
state.cluster().querier().querier_grpc_connection();
|
||||
for write_token in &state.write_tokens {
|
||||
wait_for_persisted(write_token, querier_grpc_connection.clone()).await;
|
||||
}
|
||||
info!("====Done waiting for all write tokens to be persisted");
|
||||
}
|
||||
// Specifically for cases when the querier doesn't know about the ingester so the
|
||||
// test needs to ask the ingester directly.
|
||||
Step::WaitForPersistedAccordingToIngester => {
|
||||
info!("====Begin waiting for all write tokens to be persisted");
|
||||
let ingester_grpc_connection =
|
||||
state.cluster().ingester().ingester_grpc_connection();
|
||||
|
@ -172,11 +187,11 @@ impl<'a> StepTest<'a> {
|
|||
}
|
||||
Step::AssertNotPersisted => {
|
||||
info!("====Begin checking all tokens not persisted");
|
||||
let ingester_grpc_connection =
|
||||
state.cluster().ingester().ingester_grpc_connection();
|
||||
let querier_grpc_connection =
|
||||
state.cluster().querier().querier_grpc_connection();
|
||||
for write_token in &state.write_tokens {
|
||||
let persisted =
|
||||
token_is_persisted(write_token, ingester_grpc_connection.clone()).await;
|
||||
token_is_persisted(write_token, querier_grpc_connection.clone()).await;
|
||||
assert!(!persisted);
|
||||
}
|
||||
info!("====Done checking all tokens not persisted");
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use data_types::{KafkaPartition, SequenceNumber};
|
||||
use data_types::{KafkaPartition, KafkaPartitionWriteStatus, SequenceNumber};
|
||||
use dml::DmlMeta;
|
||||
/// Protobuf to/from conversion
|
||||
use generated_types::influxdata::iox::write_summary::v1 as proto;
|
||||
|
@ -37,19 +37,6 @@ pub struct WriteSummary {
|
|||
sequencers: BTreeMap<KafkaPartition, Vec<SequenceNumber>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum KafkaPartitionWriteStatus {
|
||||
/// Nothing is known about this write (e.g. it refers to a kafka
|
||||
/// partition for which we have no information)
|
||||
KafkaPartitionUnknown,
|
||||
/// The data has not yet been processed by the ingester, and thus is unreadable
|
||||
Durable,
|
||||
/// The data is readable, but not yet persisted
|
||||
Readable,
|
||||
/// The data is both readable and persisted to parquet
|
||||
Persisted,
|
||||
}
|
||||
|
||||
impl WriteSummary {
|
||||
pub fn new(metas: Vec<Vec<DmlMeta>>) -> Self {
|
||||
debug!(?metas, "Creating write summary");
|
||||
|
|
Loading…
Reference in New Issue