Merge branch 'main' into dom/service-limit-metric-labels

pull/24376/head
Dom 2023-01-26 13:52:10 +00:00 committed by GitHub
commit 054101dabb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 333 additions and 62 deletions

View File

@ -884,6 +884,7 @@ pub mod tests {
compaction_level: CompactionLevel::Initial, // level of file of new writes
created_at: time_9_hour_ago, // create cold files by default
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: time_9_hour_ago,
};
// Note: The order of the test cases below is important and should not be changed

View File

@ -185,6 +185,7 @@ mod tests {
created_at: Timestamp::new(1),
compaction_level: CompactionLevel::Initial,
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: Timestamp::new(1),
};
let parquet_file = txn
.parquet_files()
@ -265,6 +266,7 @@ mod tests {
created_at: Timestamp::new(1),
compaction_level: CompactionLevel::Initial,
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: Timestamp::new(1),
};
let parquet_file = txn
.parquet_files()
@ -349,6 +351,7 @@ mod tests {
created_at: Timestamp::new(1),
compaction_level: CompactionLevel::Initial,
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: Timestamp::new(1),
};
let parquet_file = txn
.parquet_files()

View File

@ -650,9 +650,10 @@ impl CompactPlan {
.context(ExecuteCompactPlanSnafu)?;
trace!(partition = i, "built result stream for partition");
let time_now = time_provider.now();
let meta = IoxMetadata {
object_store_id: Uuid::new_v4(),
creation_timestamp: time_provider.now(),
creation_timestamp: time_now,
shard_id: partition.shard_id(),
namespace_id: partition.namespace_id(),
namespace_name: partition.namespace.name.clone().into(),
@ -663,6 +664,7 @@ impl CompactPlan {
max_sequence_number,
compaction_level: target_level,
sort_key: Some(sort_key.clone()),
max_l0_created_at: time_now,
};
debug!(

View File

@ -1836,6 +1836,7 @@ mod tests {
compaction_level,
created_at: Timestamp::new(12),
column_set: ColumnSet::new(std::iter::empty()),
max_l0_created_at: Timestamp::new(12),
};
// Estimated arrow bytes for one file with a tag, a time and 11 rows = 1176
CompactorParquetFile::new(f, ESTIMATED_STREAM_BYTES, (file_size_bytes * 2) as u64)

View File

@ -56,6 +56,29 @@ mod tests {
(6, CompactionLevel::Initial),
]
);
// verify ID and max_l0_created_at
let time_provider = Arc::clone(&setup.config.time_provider);
let time_1_minute_future = time_provider.minutes_into_future(1).timestamp_nanos();
let time_2_minutes_future = time_provider.minutes_into_future(2).timestamp_nanos();
let time_3_minutes_future = time_provider.minutes_into_future(3).timestamp_nanos();
let time_5_minutes_future = time_provider.minutes_into_future(5).timestamp_nanos();
let files_and_max_l0_created_ats: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.max_l0_created_at.get()))
.collect();
assert_eq!(
files_and_max_l0_created_ats,
vec![
(1, time_1_minute_future),
(2, time_2_minutes_future),
(3, time_5_minutes_future),
(4, time_3_minutes_future),
(5, time_5_minutes_future),
(6, time_2_minutes_future),
]
);
// compact
run_compact(&setup).await;
@ -77,6 +100,16 @@ mod tests {
(8, CompactionLevel::FileNonOverlapped),
]
);
// verify ID and max_l0_created_at
let files_and_max_l0_created_ats: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.max_l0_created_at.get()))
.collect();
// both files have max_l0_created time_5_minutes_future which is the max of all L0 input's max_l0_created_at
assert_eq!(
files_and_max_l0_created_ats,
vec![(7, time_5_minutes_future), (8, time_5_minutes_future),]
);
// verify the content of files
// Compacted smaller file with the later data

View File

@ -193,7 +193,7 @@ impl QueryChunk for QueryableParquetChunk {
// . We can only compact different sets of files of the same partition concurrently into the same target_level.
// We can use the following rules to set order of the chunk of its (compaction_level, target_level) as follows:
// . compaction_level < target_level : the order is `created_at`
// . compaction_level == target_level : order is `0` to make sure it is in the front of the ordered list.
// . compaction_level == target_level : order is 0 to make sure it is in the front of the ordered list.
// This means that the chunk of `compaction_level == target_level` will be in arbitrary order and will be
// fine as long as they are in front of the chunks of `compaction_level < target_level`

View File

@ -4,6 +4,7 @@ use async_trait::async_trait;
use data_types::{CompactionLevel, ParquetFileParams};
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
use iox_query::exec::{Executor, ExecutorType};
use iox_time::Time;
use crate::partition_info::PartitionInfo;
@ -49,11 +50,16 @@ where
stream: SendableRecordBatchStream,
partition: Arc<PartitionInfo>,
level: CompactionLevel,
max_l0_created_at: Time,
) -> Result<Option<ParquetFileParams>, DataFusionError> {
let inner = Arc::clone(&self.inner);
self.exec
.executor(ExecutorType::Reorg)
.spawn(async move { inner.store(stream, partition, level).await })
.spawn(async move {
inner
.store(stream, partition, level, max_l0_created_at)
.await
})
.await
.map_err(|e| DataFusionError::External(e.into()))?
}
@ -92,7 +98,11 @@ mod tests {
));
let partition = partition_info();
let level = CompactionLevel::FileNonOverlapped;
let err = sink.store(stream, partition, level).await.unwrap_err();
let max_l0_created_at = Time::from_timestamp_nanos(0);
let err = sink
.store(stream, partition, level, max_l0_created_at)
.await
.unwrap_err();
assert_eq!(err.to_string(), "External error: foo",);
}
}

View File

@ -3,6 +3,7 @@ use std::{fmt::Display, sync::Arc};
use async_trait::async_trait;
use data_types::{CompactionLevel, ParquetFileParams};
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
use iox_time::Time;
use observability_deps::tracing::{info, warn};
use crate::partition_info::PartitionInfo;
@ -45,10 +46,11 @@ where
stream: SendableRecordBatchStream,
partition: Arc<PartitionInfo>,
level: CompactionLevel,
max_l0_created_at: Time,
) -> Result<Option<ParquetFileParams>, DataFusionError> {
let res = self
.inner
.store(stream, Arc::clone(&partition), level)
.store(stream, Arc::clone(&partition), level, max_l0_created_at)
.await;
match &res {
Ok(Some(f)) => {

View File

@ -13,6 +13,7 @@ use datafusion::{
physical_plan::SendableRecordBatchStream,
};
use futures::TryStreamExt;
use iox_time::Time;
use uuid::Uuid;
use crate::partition_info::PartitionInfo;
@ -57,6 +58,7 @@ impl ParquetFileSink for MockParquetFileSink {
stream: SendableRecordBatchStream,
partition: Arc<PartitionInfo>,
level: CompactionLevel,
max_l0_created_at: Time,
) -> Result<Option<ParquetFileParams>, DataFusionError> {
let schema = stream.schema();
let batches: Vec<_> = stream.try_collect().await?;
@ -76,6 +78,7 @@ impl ParquetFileSink for MockParquetFileSink {
compaction_level: level,
created_at: Timestamp::new(1),
column_set: ColumnSet::new(vec![]),
max_l0_created_at: max_l0_created_at.into(),
});
guard.push(StoredFile {
batches,
@ -118,13 +121,14 @@ mod tests {
.as_arrow();
let partition = partition_info();
let level = CompactionLevel::FileNonOverlapped;
let max_l0_created_at = Time::from_timestamp_nanos(1);
let stream = Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&schema),
futures::stream::empty(),
));
assert_eq!(
sink.store(stream, Arc::clone(&partition), level)
sink.store(stream, Arc::clone(&partition), level, max_l0_created_at)
.await
.unwrap(),
None,
@ -137,7 +141,7 @@ mod tests {
futures::stream::once(async move { Ok(record_batch_captured) }),
));
assert_eq!(
sink.store(stream, Arc::clone(&partition), level)
sink.store(stream, Arc::clone(&partition), level, max_l0_created_at)
.await
.unwrap(),
None,
@ -154,7 +158,7 @@ mod tests {
futures::stream::once(async move { Ok(record_batch_captured) }),
));
assert_eq!(
sink.store(stream, Arc::clone(&partition), level)
sink.store(stream, Arc::clone(&partition), level, max_l0_created_at)
.await
.unwrap(),
Some(ParquetFileParams {
@ -170,7 +174,8 @@ mod tests {
row_count: 1,
compaction_level: CompactionLevel::FileNonOverlapped,
created_at: Timestamp::new(1),
column_set: ColumnSet::new([])
column_set: ColumnSet::new([]),
max_l0_created_at: max_l0_created_at.into(),
}),
);

View File

@ -6,6 +6,7 @@ use std::{
use async_trait::async_trait;
use data_types::{CompactionLevel, ParquetFileParams};
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
use iox_time::Time;
use crate::partition_info::PartitionInfo;
@ -24,5 +25,6 @@ pub trait ParquetFileSink: Debug + Display + Send + Sync {
stream: SendableRecordBatchStream,
partition: Arc<PartitionInfo>,
level: CompactionLevel,
max_l0_created_at: Time,
) -> Result<Option<ParquetFileParams>, DataFusionError>;
}

View File

@ -3,7 +3,7 @@ use std::{fmt::Display, sync::Arc};
use async_trait::async_trait;
use data_types::{CompactionLevel, ParquetFileParams, SequenceNumber, ShardId};
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
use iox_time::TimeProvider;
use iox_time::{Time, TimeProvider};
use parquet_file::{
metadata::IoxMetadata,
serialize::CodecError,
@ -52,6 +52,7 @@ impl ParquetFileSink for ObjectStoreParquetFileSink {
stream: SendableRecordBatchStream,
partition: Arc<PartitionInfo>,
level: CompactionLevel,
max_l0_created_at: Time,
) -> Result<Option<ParquetFileParams>, DataFusionError> {
let meta = IoxMetadata {
object_store_id: Uuid::new_v4(),
@ -66,6 +67,7 @@ impl ParquetFileSink for ObjectStoreParquetFileSink {
max_sequence_number: SequenceNumber::new(MAX_SEQUENCE_NUMBER),
compaction_level: level,
sort_key: partition.sort_key.clone(),
max_l0_created_at,
};
// Stream the record batches from the compaction exec, serialize

View File

@ -3,6 +3,7 @@ use std::{future::Future, num::NonZeroUsize, sync::Arc, time::Duration};
use data_types::{CompactionLevel, ParquetFile, ParquetFileParams, PartitionId};
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt};
use iox_time::Time;
use parquet_file::ParquetFilePath;
use tracker::InstrumentedAsyncSemaphore;
@ -115,6 +116,13 @@ async fn try_compact_partition(
while let Some(branch) = branches.pop() {
let delete_ids = branch.iter().map(|f| f.id).collect::<Vec<_>>();
// compute max_l0_created_at
let max_l0_created_at = branch
.iter()
.map(|f| f.max_l0_created_at)
.max()
.expect("max_l0_created_at should have value");
// stage files
let input_paths: Vec<ParquetFilePath> = branch.iter().map(|f| f.into()).collect();
let input_uuids_inpad = scratchpad_ctx.load_to_scratchpad(&input_paths).await;
@ -154,6 +162,7 @@ async fn try_compact_partition(
streams,
Arc::clone(partition_info),
target_level,
max_l0_created_at.into(),
Arc::clone(&components),
);
@ -242,6 +251,7 @@ fn stream_into_file_sink(
streams: Vec<SendableRecordBatchStream>,
partition_info: Arc<PartitionInfo>,
target_level: CompactionLevel,
max_l0_created_at: Time,
components: Arc<Components>,
) -> impl Future<Output = Result<Vec<ParquetFileParams>, Error>> {
streams
@ -252,7 +262,7 @@ fn stream_into_file_sink(
async move {
components
.parquet_file_sink
.store(stream, partition_info, target_level)
.store(stream, partition_info, target_level, max_l0_created_at)
.await
}
})

View File

@ -8,7 +8,7 @@ use data_types::{
};
use datafusion::arrow::record_batch::RecordBatch;
use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable};
use iox_time::{SystemProvider, TimeProvider};
use iox_time::TimeProvider;
use parquet_file::storage::{ParquetStorage, StorageId};
use schema::sort::SortKey;
use uuid::Uuid;
@ -42,6 +42,7 @@ impl ParquetFileBuilder {
compaction_level: CompactionLevel::FileNonOverlapped,
created_at: Timestamp::new(0),
column_set: ColumnSet::new(vec![]),
max_l0_created_at: Timestamp::new(0),
},
}
}
@ -298,14 +299,13 @@ impl TestSetup {
partition_key: partition.partition.partition_key.clone(),
});
let time_provider = Arc::<iox_time::MockProvider>::clone(&catalog.time_provider);
let mut parquet_files = vec![];
if with_files {
let time = SystemProvider::new();
let time_16_minutes_ago = time.minutes_ago(16);
let time_5_minutes_ago = time.minutes_ago(5);
let time_2_minutes_ago = time.minutes_ago(2);
let time_1_minute_ago = time.minutes_ago(1);
let time_now = time.now();
let time_1_minute_future = time_provider.minutes_into_future(1);
let time_2_minutes_future = time_provider.minutes_into_future(2);
let time_3_minutes_future = time_provider.minutes_into_future(3);
let time_5_minutes_future = time_provider.minutes_into_future(5);
// L1 file
let lp = vec![
@ -315,7 +315,8 @@ impl TestSetup {
.join("\n");
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp)
.with_creation_time(time_1_minute_ago)
.with_creation_time(time_3_minutes_future)
.with_max_l0_created_at(time_1_minute_future)
.with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction
let level_1_file_1_minute_ago = partition.create_parquet_file(builder).await.into();
@ -329,7 +330,8 @@ impl TestSetup {
.join("\n");
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp)
.with_creation_time(time_16_minutes_ago)
.with_creation_time(time_2_minutes_future)
.with_max_l0_created_at(time_2_minutes_future)
.with_compaction_level(CompactionLevel::Initial);
let level_0_file_16_minutes_ago = partition.create_parquet_file(builder).await.into();
@ -342,7 +344,8 @@ impl TestSetup {
.join("\n");
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp)
.with_creation_time(time_5_minutes_ago)
.with_creation_time(time_5_minutes_future)
.with_max_l0_created_at(time_5_minutes_future)
.with_compaction_level(CompactionLevel::Initial);
let level_0_file_5_minutes_ago = partition.create_parquet_file(builder).await.into();
@ -356,9 +359,10 @@ impl TestSetup {
.join("\n");
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp)
.with_creation_time(time_1_minute_ago)
.with_creation_time(time_5_minutes_future)
.with_max_l0_created_at(time_3_minutes_future)
.with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction
let level_1_file_1_minute_ago_with_duplicates =
let level_1_file_1_minute_ago_with_duplicates: ParquetFile =
partition.create_parquet_file(builder).await.into();
// L0 file
@ -367,7 +371,8 @@ impl TestSetup {
.with_line_protocol(&lp)
.with_min_time(0)
.with_max_time(36000)
.with_creation_time(time_now)
.with_creation_time(time_5_minutes_future)
.with_max_l0_created_at(time_5_minutes_future)
// Will put the group size between "small" and "large"
.with_size_override(50 * 1024 * 1024)
.with_compaction_level(CompactionLevel::Initial);
@ -383,7 +388,8 @@ impl TestSetup {
.with_line_protocol(&lp)
.with_min_time(36001)
.with_max_time(136000)
.with_creation_time(time_2_minutes_ago)
.with_creation_time(time_2_minutes_future)
.with_max_l0_created_at(time_2_minutes_future)
// Will put the group size two multiples over "large"
.with_size_override(180 * 1024 * 1024)
.with_compaction_level(CompactionLevel::Initial);
@ -411,7 +417,7 @@ impl TestSetup {
Arc::new(object_store::memory::InMemory::new()),
StorageId::from("scratchpad"),
),
time_provider: Arc::<iox_time::MockProvider>::clone(&catalog.time_provider),
time_provider,
exec: Arc::clone(&catalog.exec),
backoff_config: BackoffConfig::default(),
partition_concurrency: NonZeroUsize::new(1).unwrap(),

View File

@ -360,6 +360,12 @@ impl From<iox_time::Time> for Timestamp {
}
}
impl From<Timestamp> for iox_time::Time {
fn from(time: Timestamp) -> iox_time::Time {
iox_time::Time::from_timestamp_nanos(time.get())
}
}
impl Add for Timestamp {
type Output = Self;
@ -1105,6 +1111,8 @@ pub struct ParquetFile {
/// The columns that are present in the table-wide schema are sorted according to the partition
/// sort key. The occur in the parquet file according to this order.
pub column_set: ColumnSet,
/// the max of created_at of all L0 files needed for file/chunk ordering for deduplication
pub max_l0_created_at: Timestamp,
}
impl ParquetFile {
@ -1128,6 +1136,7 @@ impl ParquetFile {
compaction_level: params.compaction_level,
created_at: params.created_at,
column_set: params.column_set,
max_l0_created_at: params.max_l0_created_at,
}
}
@ -1167,6 +1176,8 @@ pub struct ParquetFileParams {
pub created_at: Timestamp,
/// columns in this file.
pub column_set: ColumnSet,
/// the max of created_at of all L0 files
pub max_l0_created_at: Timestamp,
}
impl From<ParquetFile> for ParquetFileParams {
@ -1185,6 +1196,7 @@ impl From<ParquetFile> for ParquetFileParams {
compaction_level: value.compaction_level,
created_at: value.created_at,
column_set: value.column_set,
max_l0_created_at: value.max_l0_created_at,
}
}
}

View File

@ -171,6 +171,7 @@ mod tests {
compaction_level: CompactionLevel::Initial,
created_at: Timestamp::new(1),
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: Timestamp::new(1),
};
let parquet_file = repos

View File

@ -38,4 +38,6 @@ message ParquetFile {
int64 created_at = 15;
// Set of columns within this parquet file.
repeated int64 column_set = 16;
// max creation timestamp of all L0s this parquet file is compacted to
int64 max_l0_created_at = 18;
}

View File

@ -52,6 +52,9 @@ message IoxMetadata {
// the compaction level of the file
int32 compaction_level = 16;
// max creation time of all L0 files this file is compacted to
google.protobuf.Timestamp max_l0_created_at = 18;
}
// Sort key of a chunk.

View File

@ -365,6 +365,7 @@ async fn load_parquet_files(
.expect("compaction level should be valid"),
created_at: Timestamp::new(p.created_at),
column_set: ColumnSet::new(p.column_set.into_iter().map(ColumnId::new)),
max_l0_created_at: Timestamp::new(p.max_l0_created_at),
};
repos.parquet_files().create(params).await?
@ -579,6 +580,7 @@ mod tests {
compaction_level: CompactionLevel::Initial as i32,
created_at: created_at.get(),
column_set: vec![1, 2],
max_l0_created_at: created_at.get(),
}],
)
.await
@ -603,6 +605,7 @@ mod tests {
compaction_level: CompactionLevel::Initial,
created_at,
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: created_at,
}];
assert_eq!(expected, files);
}

View File

@ -405,14 +405,14 @@ async fn write_and_query() {
.arg("../test_fixtures/lineproto/air_and_water.lp")
// gzipped line protocol ('m0')
.arg("../test_fixtures/lineproto/read_filter.lp.gz")
// iox formatted parquet ('cpu' measurement)
// iox formatted parquet ('cpu' measurement)
.arg("../test_fixtures/cpu.parquet")
.assert()
.success()
// this number is the total size of
// uncompressed line protocol stored in all
// three files
.stdout(predicate::str::contains("1137058 Bytes OK"));
.stdout(predicate::str::contains("889317 Bytes OK"));
}
.boxed()
})),

View File

@ -3,12 +3,18 @@ mod multi_ingester;
use std::time::Duration;
use arrow::datatypes::{DataType, SchemaRef};
use arrow_flight::{
decode::{DecodedFlightData, DecodedPayload},
error::FlightError,
};
use arrow_util::assert_batches_sorted_eq;
use assert_cmd::{assert::Assert, Command};
use futures::{FutureExt, StreamExt, TryStreamExt};
use generated_types::{
aggregate::AggregateType, read_group_request::Group, read_response::frame::Data,
};
use influxdb_iox_client::flight::IOxRecordBatchStream;
use predicates::prelude::*;
use test_helpers::assert_contains;
use test_helpers_end_to_end::{
@ -92,7 +98,7 @@ mod with_kafka {
let mut client =
influxdb_iox_client::flight::Client::new(querier_connection);
let result_stream = client.sql(namespace.into(), sql).await.unwrap();
let result_stream = client.sql(namespace, &sql).await.unwrap();
let mut flight_stream = result_stream.into_inner();
@ -103,6 +109,15 @@ mod with_kafka {
// otherwise other clients may complain
// https://github.com/influxdata/influxdb_iox/pull/6668
assert!(flight_stream.got_schema());
// run the query again and ensure there are no dictionaries
let result_stream = client.sql(namespace, sql).await.unwrap();
verify_schema(result_stream).await;
// run a query that does return results and ensure there are no dictionaries
let sql = format!("select * from {table_name}");
let result_stream = client.sql(namespace, sql).await.unwrap();
verify_schema(result_stream).await;
}
.boxed()
})),
@ -1043,7 +1058,7 @@ mod kafkaless_rpc_write {
let mut client =
influxdb_iox_client::flight::Client::new(querier_connection);
let result_stream = client.sql(namespace.into(), sql).await.unwrap();
let result_stream = client.sql(namespace, &sql).await.unwrap();
let mut flight_stream = result_stream.into_inner();
@ -1054,6 +1069,15 @@ mod kafkaless_rpc_write {
// otherwise other clients may complain
// https://github.com/influxdata/influxdb_iox/pull/6668
assert!(flight_stream.got_schema());
// run the query again and ensure there are no dictionaries
let result_stream = client.sql(namespace, sql).await.unwrap();
verify_schema(result_stream).await;
// run a query that does return results and ensure there are no dictionaries
let sql = format!("select * from {table_name}");
let result_stream = client.sql(namespace, sql).await.unwrap();
verify_schema(result_stream).await;
}
.boxed()
})),
@ -1175,3 +1199,36 @@ mod kafkaless_rpc_write {
StepTest::new(&mut cluster, steps).run().await
}
}
/// Some clients, such as the golang ones, can not decode
/// dictinary encoded Flight data. This function asserts that all
/// schemas received in the stream are unpacked
pub(crate) async fn verify_schema(stream: IOxRecordBatchStream) {
let flight_stream = stream.into_inner().into_inner();
let decoded_data: Result<Vec<DecodedFlightData>, FlightError> =
flight_stream.try_collect().await;
// no errors
let decoded_data = decoded_data.unwrap();
// the schema should not have any dictionary encoded batches in it
// as go clients can't deal with this
for DecodedFlightData { inner: _, payload } in decoded_data {
match payload {
DecodedPayload::None => {}
DecodedPayload::Schema(s) => assert_no_dictionaries(s),
DecodedPayload::RecordBatch(b) => assert_no_dictionaries(b.schema()),
}
}
}
fn assert_no_dictionaries(schema: SchemaRef) {
for field in schema.fields() {
let dt = field.data_type();
assert!(
!matches!(dt, DataType::Dictionary(_, _)),
"Found unexpected dictionary in schema: {schema:#?}"
);
}
}

View File

@ -125,7 +125,7 @@ impl From<tonic::Status> for Error {
///
/// // results is a stream of RecordBatches
/// let query_results = client
/// .sql("my_namespace".into(), "select * from cpu_load".into())
/// .sql("my_namespace", "select * from cpu_load")
/// .await
/// .expect("query request should work");
///
@ -173,12 +173,12 @@ impl Client {
/// a struct that can stream Arrow [`RecordBatch`] results.
pub async fn sql(
&mut self,
namespace_name: String,
sql_query: String,
namespace_name: impl Into<String> + Send,
sql_query: impl Into<String> + Send,
) -> Result<IOxRecordBatchStream, Error> {
let request = ReadInfo {
namespace_name,
sql_query,
namespace_name: namespace_name.into(),
sql_query: sql_query.into(),
query_type: QueryType::Sql.into(),
flightsql_command: vec![],
};
@ -190,12 +190,12 @@ impl Client {
/// a struct that can stream Arrow [`RecordBatch`] results.
pub async fn influxql(
&mut self,
namespace_name: String,
influxql_query: String,
namespace_name: impl Into<String> + Send,
influxql_query: impl Into<String> + Send,
) -> Result<IOxRecordBatchStream, Error> {
let request = ReadInfo {
namespace_name,
sql_query: influxql_query,
namespace_name: namespace_name.into(),
sql_query: influxql_query.into(),
query_type: QueryType::InfluxQl.into(),
flightsql_command: vec![],
};

View File

@ -479,6 +479,7 @@ mod tests {
compaction_level: CompactionLevel::Initial,
created_at: Timestamp::new(1),
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: Timestamp::new(1),
};
repos
.parquet_files()

View File

@ -450,9 +450,10 @@ impl Persister for IngesterData {
let object_store_id = Uuid::new_v4();
// Construct the metadata for this parquet file.
let time_now = SystemProvider::new().now();
let iox_metadata = IoxMetadata {
object_store_id,
creation_timestamp: SystemProvider::new().now(),
creation_timestamp: time_now,
shard_id,
namespace_id,
namespace_name: Arc::clone(&*namespace.namespace_name().get().await),
@ -463,6 +464,7 @@ impl Persister for IngesterData {
max_sequence_number: batch_sequence_number_range.inclusive_max().unwrap(),
compaction_level: CompactionLevel::Initial,
sort_key: Some(data_sort_key),
max_l0_created_at: time_now,
};
// Save the compacted data to a parquet file in object storage.
@ -1120,7 +1122,7 @@ mod tests {
// different, the file may change slightly from time to time
//
// https://github.com/influxdata/influxdb_iox/issues/5434
let expected_size = 1252;
let expected_size = 1265;
let allowable_delta = 10;
let size_delta = (pf.file_size_bytes - expected_size).abs();
assert!(

View File

@ -13,7 +13,7 @@ mod tests {
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use data_types::{CompactionLevel, ParquetFile, PartitionKey, ShardId};
use data_types::{CompactionLevel, ParquetFile, PartitionKey, ShardId, SequenceNumber};
use dml::DmlOperation;
use futures::TryStreamExt;
use iox_catalog::{
@ -259,18 +259,21 @@ mod tests {
row_count,
compaction_level,
file_size_bytes,
created_at,
max_l0_created_at,
..
}] =>
{
assert_eq!(created_at.get(), max_l0_created_at.get());
assert_eq!(got_namespace_id, namespace_id);
assert_eq!(got_table_id, table_id);
assert_eq!(got_partition_id, partition_id);
assert_eq!(max_sequence_number, SequenceNumber::new(0));
assert_eq!(row_count, 1);
assert_eq!(compaction_level, CompactionLevel::Initial);
assert_eq!(max_sequence_number.get(), 0); // Unused, dummy value
(object_store_id, file_size_bytes)
}
);
@ -406,9 +409,13 @@ mod tests {
row_count,
compaction_level,
file_size_bytes,
created_at,
max_l0_created_at,
..
}] =>
{
assert_eq!(created_at.get(), max_l0_created_at.get());
assert_eq!(got_namespace_id, namespace_id);
assert_eq!(got_table_id, table_id);
assert_eq!(got_partition_id, partition_id);

View File

@ -253,9 +253,10 @@ where
);
// Construct the metadata for this parquet file.
let time_now = SystemProvider::new().now();
let iox_metadata = IoxMetadata {
object_store_id,
creation_timestamp: SystemProvider::new().now(),
creation_timestamp: time_now,
shard_id: ctx.transition_shard_id(),
namespace_id: ctx.namespace_id(),
namespace_name: Arc::clone(&*ctx.namespace_name().get().await),
@ -266,6 +267,7 @@ where
max_sequence_number: SequenceNumber::new(0), // TODO: not ordered!
compaction_level: CompactionLevel::Initial,
sort_key: Some(data_sort_key),
max_l0_created_at: time_now,
};
// Save the compacted data to a parquet file in object storage.

View File

@ -0,0 +1,3 @@
ALTER TABLE
IF EXISTS parquet_file
ADD COLUMN max_l0_created_at BIGINT NOT NULL DEFAULT 0;

View File

@ -2114,6 +2114,7 @@ pub(crate) mod test_helpers {
let min_time = Timestamp::new(10);
let max_time = Timestamp::new(20);
let max_sequence_number = SequenceNumber::new(140);
let max_l0_created_at = Timestamp::new(0);
let parquet_file_params = ParquetFileParams {
shard_id: shard.id,
@ -2129,6 +2130,7 @@ pub(crate) mod test_helpers {
compaction_level: CompactionLevel::Initial,
created_at: Timestamp::new(1),
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at,
};
let parquet_file = repos
.parquet_files()
@ -2345,6 +2347,7 @@ pub(crate) mod test_helpers {
compaction_level: CompactionLevel::Initial,
created_at: Timestamp::new(1),
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: Timestamp::new(1),
};
let parquet_file = repos
.parquet_files()
@ -2863,6 +2866,7 @@ pub(crate) mod test_helpers {
compaction_level: CompactionLevel::Initial,
created_at: Timestamp::new(1),
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: Timestamp::new(1),
};
let parquet_file = repos
@ -3007,6 +3011,7 @@ pub(crate) mod test_helpers {
compaction_level: CompactionLevel::Initial,
created_at: Timestamp::new(1),
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: Timestamp::new(1),
};
let parquet_file = repos
.parquet_files()
@ -3198,6 +3203,7 @@ pub(crate) mod test_helpers {
.await
.unwrap();
let time_now = Timestamp::from(catalog.time_provider().now());
let time_five_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(5));
let time_8_hours_ago = Timestamp::from(catalog.time_provider().hours_ago(8));
let time_38_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(38));
@ -3270,6 +3276,7 @@ pub(crate) mod test_helpers {
compaction_level: CompactionLevel::Initial,
created_at: time_38_hour_ago,
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: time_now,
};
let delete_l0_file = repos
.parquet_files()
@ -3903,6 +3910,7 @@ pub(crate) mod test_helpers {
compaction_level: CompactionLevel::Initial,
created_at: time_three_hour_ago,
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: time_now,
};
// create a deleted L0 file that was created 3 hours ago
@ -4319,6 +4327,7 @@ pub(crate) mod test_helpers {
compaction_level: CompactionLevel::Initial,
created_at: time_now,
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: time_now,
};
let delete_l0_file = repos
.parquet_files()
@ -4716,6 +4725,7 @@ pub(crate) mod test_helpers {
compaction_level: CompactionLevel::FileNonOverlapped,
created_at: time_now,
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: time_now,
};
let delete_l1_file = repos
.parquet_files()
@ -4932,6 +4942,7 @@ pub(crate) mod test_helpers {
compaction_level: CompactionLevel::Initial,
created_at: Timestamp::new(1),
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: Timestamp::new(1),
};
let parquet_file = repos
@ -5053,6 +5064,7 @@ pub(crate) mod test_helpers {
compaction_level: CompactionLevel::Initial,
created_at: Timestamp::new(1),
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: Timestamp::new(1),
};
let parquet_file = repos
.parquet_files()
@ -5179,6 +5191,7 @@ pub(crate) mod test_helpers {
compaction_level: CompactionLevel::Initial,
created_at: Timestamp::new(1),
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: Timestamp::new(1),
};
let p1 = repos
.parquet_files()
@ -5364,6 +5377,7 @@ pub(crate) mod test_helpers {
compaction_level: CompactionLevel::Initial,
created_at: Timestamp::new(1),
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: Timestamp::new(1),
};
let p1_n1 = repos
.parquet_files()
@ -5499,6 +5513,7 @@ pub(crate) mod test_helpers {
compaction_level: CompactionLevel::Initial,
created_at: Timestamp::new(1),
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: Timestamp::new(1),
};
let p1_n2 = repos
.parquet_files()

View File

@ -1689,6 +1689,7 @@ impl ParquetFileRepo for PostgresTxn {
compaction_level,
created_at,
column_set,
max_l0_created_at,
} = parquet_file_params;
let rec = sqlx::query_as::<_, ParquetFile>(
@ -1696,8 +1697,8 @@ impl ParquetFileRepo for PostgresTxn {
INSERT INTO parquet_file (
shard_id, table_id, partition_id, object_store_id,
max_sequence_number, min_time, max_time, file_size_bytes,
row_count, compaction_level, created_at, namespace_id, column_set )
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 )
row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at )
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 )
RETURNING *;
"#,
)
@ -1714,6 +1715,7 @@ RETURNING *;
.bind(created_at) // $11
.bind(namespace_id) // $12
.bind(column_set) // $13
.bind(max_l0_created_at) // $14
.fetch_one(&mut self.inner)
.await
.map_err(|e| {
@ -1776,7 +1778,7 @@ RETURNING *;
r#"
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at, column_set
row_count, compaction_level, created_at, column_set, max_l0_created_at
FROM parquet_file
WHERE shard_id = $1
AND max_sequence_number > $2
@ -1802,7 +1804,8 @@ SELECT parquet_file.id, parquet_file.shard_id, parquet_file.namespace_id,
parquet_file.table_id, parquet_file.partition_id, parquet_file.object_store_id,
parquet_file.max_sequence_number, parquet_file.min_time,
parquet_file.max_time, parquet_file.to_delete, parquet_file.file_size_bytes,
parquet_file.row_count, parquet_file.compaction_level, parquet_file.created_at, parquet_file.column_set
parquet_file.row_count, parquet_file.compaction_level, parquet_file.created_at, parquet_file.column_set,
parquet_file.max_l0_created_at
FROM parquet_file
INNER JOIN table_name on table_name.id = parquet_file.table_id
WHERE table_name.namespace_id = $1
@ -1822,7 +1825,7 @@ WHERE table_name.namespace_id = $1
r#"
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at, column_set
row_count, compaction_level, created_at, column_set, max_l0_created_at
FROM parquet_file
WHERE table_id = $1 AND to_delete IS NULL;
"#,
@ -1882,7 +1885,7 @@ RETURNING id;
r#"
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at, column_set
row_count, compaction_level, created_at, column_set, max_l0_created_at
FROM parquet_file
WHERE parquet_file.shard_id = $1
AND parquet_file.compaction_level = $2
@ -1909,7 +1912,7 @@ WHERE parquet_file.shard_id = $1
r#"
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at, column_set
row_count, compaction_level, created_at, column_set, max_l0_created_at
FROM parquet_file
WHERE parquet_file.shard_id = $1
AND parquet_file.table_id = $2
@ -2115,7 +2118,7 @@ LIMIT $4;
r#"
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at, column_set
row_count, compaction_level, created_at, column_set, max_l0_created_at
FROM parquet_file
WHERE parquet_file.partition_id = $1
AND parquet_file.to_delete IS NULL;
@ -2250,7 +2253,7 @@ WHERE table_id = $1
r#"
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at, column_set
row_count, compaction_level, created_at, column_set, max_l0_created_at
FROM parquet_file
WHERE object_store_id = $1;
"#,
@ -3148,6 +3151,7 @@ mod tests {
compaction_level: CompactionLevel::Initial, // level of file of new writes
created_at: time_now,
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: time_now,
};
let f1 = postgres
.repositories()

View File

@ -619,6 +619,7 @@ impl TestPartition {
to_delete,
object_store_id,
row_count,
max_l0_created_at,
} = builder;
let record_batch = record_batch.expect("A record batch is required");
@ -653,6 +654,7 @@ impl TestPartition {
max_sequence_number,
compaction_level: CompactionLevel::Initial,
sort_key: Some(sort_key.clone()),
max_l0_created_at: Time::from_timestamp_nanos(max_l0_created_at),
};
let real_file_size_bytes = create_parquet_file(
ParquetStorage::new(
@ -678,6 +680,7 @@ impl TestPartition {
to_delete,
object_store_id: Some(object_store_id),
row_count: None, // will be computed from the record batch again
max_l0_created_at,
};
let result = self.create_parquet_file_catalog_record(builder).await;
@ -704,6 +707,7 @@ impl TestPartition {
to_delete,
object_store_id,
row_count,
max_l0_created_at,
..
} = builder;
@ -744,6 +748,7 @@ impl TestPartition {
created_at: Timestamp::new(creation_time),
compaction_level,
column_set,
max_l0_created_at: Timestamp::new(max_l0_created_at),
};
let mut repos = self.catalog.catalog.repositories().await;
@ -789,6 +794,7 @@ pub struct TestParquetFileBuilder {
to_delete: bool,
object_store_id: Option<Uuid>,
row_count: Option<usize>,
max_l0_created_at: i64,
}
impl Default for TestParquetFileBuilder {
@ -807,6 +813,7 @@ impl Default for TestParquetFileBuilder {
to_delete: false,
object_store_id: None,
row_count: None,
max_l0_created_at: 1,
}
}
}
@ -863,6 +870,12 @@ impl TestParquetFileBuilder {
self
}
/// specify max creation time of all L0 this file was created from
pub fn with_max_l0_created_at(mut self, time: iox_time::Time) -> Self {
self.max_l0_created_at = time.timestamp_nanos();
self
}
/// Specify the compaction level for the parquet file metadata.
pub fn with_compaction_level(mut self, compaction_level: CompactionLevel) -> Self {
self.compaction_level = compaction_level;

View File

@ -193,6 +193,12 @@ pub trait TimeProvider: std::fmt::Debug + Send + Sync + 'static {
/// Sleep until given time.
fn sleep_until(&self, t: Time) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
/// Return a time that is the specified number of minutes in the future relative to this
/// provider's `now`.
fn minutes_into_future(&self, minutes: u64) -> Time {
self.now() + Duration::from_secs(60 * minutes)
}
/// Return a time that is the specified number of minutes in the past relative to this
/// provider's `now`.
fn minutes_ago(&self, minutes_ago: u64) -> Time {
@ -611,6 +617,18 @@ mod test {
assert_eq!(min_ago.to_rfc3339(), ago);
}
#[test]
fn test_minutes_into_future() {
let now = "2022-07-07T00:00:00+00:00";
let future = "2022-07-07T00:10:00+00:00";
let provider = MockProvider::new(Time::from_rfc3339(now).unwrap());
let min_future = provider.minutes_into_future(10);
assert_eq!(min_future, Time::from_timestamp_nanos(1657152600000000000));
assert_eq!(min_future.to_rfc3339(), future);
}
#[test]
fn test_hours_ago() {
let now = "2022-07-07T00:00:00+00:00";

View File

@ -297,6 +297,12 @@ pub struct IoxMetadata {
/// Sort key of this chunk
pub sort_key: Option<SortKey>,
/// Max timestamp of creation timestamp of L0 files
/// If this metadata is for an L0 file, this value will be the same as the `creation_timestamp`
/// If this metadata is for an L1/L2 file, this value will be the max of all L0 files
/// that are compacted into this file
pub max_l0_created_at: Time,
}
impl IoxMetadata {
@ -341,6 +347,7 @@ impl IoxMetadata {
max_sequence_number: self.max_sequence_number.get(),
sort_key,
compaction_level: self.compaction_level as i32,
max_l0_created_at: Some(self.max_l0_created_at.date_time().into()),
};
let mut buf = Vec::new();
@ -359,6 +366,8 @@ impl IoxMetadata {
// extract creation timestamp
let creation_timestamp =
decode_timestamp_from_field(proto_msg.creation_timestamp, "creation_timestamp")?;
let max_l0_created_at =
decode_timestamp_from_field(proto_msg.max_l0_created_at, "max_l0_created_at")?;
// extract strings
let namespace_name = Arc::from(proto_msg.namespace_name.as_ref());
@ -395,6 +404,7 @@ impl IoxMetadata {
compaction_level: proto_msg.compaction_level,
},
)?,
max_l0_created_at,
})
}
@ -416,6 +426,7 @@ impl IoxMetadata {
max_sequence_number: SequenceNumber::new(1),
compaction_level: CompactionLevel::Initial,
sort_key: None,
max_l0_created_at: Time::from_timestamp_nanos(creation_timestamp_ns),
}
}
@ -503,6 +514,7 @@ impl IoxMetadata {
row_count: row_count.try_into().expect("row count overflows i64"),
created_at: Timestamp::from(self.creation_timestamp),
column_set: ColumnSet::new(columns),
max_l0_created_at: Timestamp::from(self.max_l0_created_at),
}
}
@ -1001,9 +1013,11 @@ mod tests {
let sort_key = SortKeyBuilder::new().with_col("sort_col").build();
let create_time = Time::from_timestamp(3234, 0).unwrap();
let iox_metadata = IoxMetadata {
object_store_id,
creation_timestamp: Time::from_timestamp(3234, 0).unwrap(),
creation_timestamp: create_time,
namespace_id: NamespaceId::new(2),
namespace_name: Arc::from("hi"),
shard_id: ShardId::new(1),
@ -1014,6 +1028,7 @@ mod tests {
max_sequence_number: SequenceNumber::new(6),
compaction_level: CompactionLevel::Initial,
sort_key: Some(sort_key),
max_l0_created_at: create_time,
};
let proto = iox_metadata.to_protobuf().unwrap();
@ -1038,6 +1053,7 @@ mod tests {
max_sequence_number: SequenceNumber::new(11),
compaction_level: CompactionLevel::FileNonOverlapped,
sort_key: None,
max_l0_created_at: Time::from_timestamp_nanos(42),
};
let array = StringArray::from_iter([Some("bananas")]);

View File

@ -219,6 +219,7 @@ mod tests {
max_sequence_number: SequenceNumber::new(11),
compaction_level: CompactionLevel::FileNonOverlapped,
sort_key: None,
max_l0_created_at: Time::from_timestamp_nanos(42),
};
let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap();

View File

@ -577,6 +577,7 @@ mod tests {
max_sequence_number: SequenceNumber::new(11),
compaction_level: CompactionLevel::FileNonOverlapped,
sort_key: None,
max_l0_created_at: Time::from_timestamp_nanos(42),
}
}

View File

@ -62,6 +62,7 @@ async fn test_decoded_iox_metadata() {
max_sequence_number: SequenceNumber::new(11),
compaction_level: CompactionLevel::FileNonOverlapped,
sort_key: None,
max_l0_created_at: Time::from_timestamp_nanos(42),
};
let mut schema_builder = SchemaBuilder::new();
@ -203,6 +204,7 @@ async fn test_empty_parquet_file_panic() {
max_sequence_number: SequenceNumber::new(11),
compaction_level: CompactionLevel::FileNonOverlapped,
sort_key: None,
max_l0_created_at: Time::from_timestamp_nanos(42),
};
let batch = RecordBatch::try_from_iter(data).unwrap();
@ -297,6 +299,7 @@ async fn test_decoded_many_columns_with_null_cols_iox_metadata() {
max_sequence_number: SequenceNumber::new(11),
compaction_level: CompactionLevel::FileNonOverlapped,
sort_key: Some(sort_key),
max_l0_created_at: Time::from_timestamp_nanos(42),
};
let mut schema_builder = SchemaBuilder::new();
@ -385,6 +388,7 @@ async fn test_derive_parquet_file_params() {
max_sequence_number: SequenceNumber::new(11),
compaction_level: CompactionLevel::FileNonOverlapped,
sort_key: None,
max_l0_created_at: Time::from_timestamp_nanos(1234),
};
// Build a schema that contains the IOx metadata, ensuring it is correctly
@ -433,6 +437,7 @@ async fn test_derive_parquet_file_params() {
assert_eq!(catalog_data.row_count, 3);
assert_eq!(catalog_data.min_time, Timestamp::new(1646917692000000000));
assert_eq!(catalog_data.max_time, Timestamp::new(1653311292000000000));
assert_eq!(catalog_data.max_l0_created_at, Timestamp::new(1234));
}
fn to_string_array(strs: &[&str]) -> ArrayRef {

View File

@ -391,8 +391,8 @@ mod tests {
partition.create_parquet_file(builder).await;
let table_id = table.table.id;
let single_file_size = 224;
let two_file_size = 408;
let single_file_size = 232;
let two_file_size = 424;
assert!(single_file_size < two_file_size);
let cache = make_cache(&catalog);

View File

@ -105,10 +105,10 @@ impl QuerierNamespace {
exec: Arc<Executor>,
ingester_connection: Option<Arc<dyn IngesterConnection>>,
sharder: Arc<JumpHash<Arc<ShardIndex>>>,
rpc_write: bool,
write_rpc: bool,
) -> Self {
let time_provider = catalog_cache.time_provider();
let chunk_adapter = Arc::new(ChunkAdapter::new(catalog_cache, metric_registry, rpc_write));
let chunk_adapter = Arc::new(ChunkAdapter::new(catalog_cache, metric_registry, write_rpc));
let query_log = Arc::new(QueryLog::new(10, time_provider));
let prune_metrics = Arc::new(PruneMetrics::new(&chunk_adapter.metric_registry()));

View File

@ -212,7 +212,7 @@ impl ChunkAdapter {
let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _));
let order = if self.rpc_write {
ChunkOrder::new(parquet_file.created_at.get())
ChunkOrder::new(parquet_file.max_l0_created_at.get())
} else {
ChunkOrder::new(parquet_file.max_sequence_number.get())
};

View File

@ -182,6 +182,7 @@ fn to_parquet_file(p: data_types::ParquetFile) -> ParquetFile {
compaction_level: p.compaction_level as i32,
created_at: p.created_at.get(),
column_set: p.column_set.iter().map(|id| id.get()).collect(),
max_l0_created_at: p.max_l0_created_at.get(),
}
}
@ -257,6 +258,7 @@ mod tests {
compaction_level: CompactionLevel::Initial,
created_at: Timestamp::new(2343),
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: Timestamp::new(2343),
};
let p2params = ParquetFileParams {
object_store_id: Uuid::new_v4(),

View File

@ -4,7 +4,9 @@
mod request;
use arrow::{
datatypes::SchemaRef, error::ArrowError, ipc::writer::IpcWriteOptions,
datatypes::{DataType, Field, Schema, SchemaRef},
error::ArrowError,
ipc::writer::IpcWriteOptions,
record_batch::RecordBatch,
};
use arrow_flight::{
@ -732,7 +734,7 @@ impl IOxFlightDataEncoderBuilder {
fn new(schema: SchemaRef) -> Self {
Self {
inner: FlightDataEncoderBuilder::new(),
schema,
schema: prepare_schema_for_flight(schema),
}
}
@ -788,6 +790,29 @@ impl Stream for IOxFlightDataEncoder {
}
}
/// Prepare an arrow Schema for transport over the Arrow Flight protocol
///
/// Convert dictionary types to underlying types
///
/// See hydrate_dictionary for more information
fn prepare_schema_for_flight(schema: SchemaRef) -> SchemaRef {
let fields = schema
.fields()
.iter()
.map(|field| match field.data_type() {
DataType::Dictionary(_, value_type) => Field::new(
field.name(),
value_type.as_ref().clone(),
field.is_nullable(),
)
.with_metadata(field.metadata().clone()),
_ => field.clone(),
})
.collect();
Arc::new(Schema::new(fields))
}
impl Stream for GetStream {
type Item = Result<FlightData, tonic::Status>;

View File

@ -154,6 +154,7 @@ mod tests {
compaction_level: CompactionLevel::Initial,
created_at: Timestamp::new(2343),
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
max_l0_created_at: Timestamp::new(2343),
};
p1 = repos.parquet_files().create(p1params).await.unwrap();

Binary file not shown.