chore: revert fill sort_key_ids
parent
fcdf7dc63e
commit
9bf1c8c11c
|
@ -42,7 +42,7 @@ use compactor::{
|
|||
PartitionInfo,
|
||||
};
|
||||
use compactor_scheduler::SchedulerConfig;
|
||||
use data_types::{ColumnSet, ColumnType, CompactionLevel, ParquetFile, TableId};
|
||||
use data_types::{ColumnType, CompactionLevel, ParquetFile, TableId};
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion_util::config::register_iox_object_store;
|
||||
use futures::TryStreamExt;
|
||||
|
@ -93,20 +93,17 @@ impl TestSetupBuilder<false> {
|
|||
let ns = catalog.create_namespace_1hr_retention("ns").await;
|
||||
let table = ns.create_table("table").await;
|
||||
table.create_column("field_int", ColumnType::I64).await;
|
||||
let tag1 = table.create_column("tag1", ColumnType::Tag).await;
|
||||
let tag2 = table.create_column("tag2", ColumnType::Tag).await;
|
||||
let tag3 = table.create_column("tag3", ColumnType::Tag).await;
|
||||
let col_time = table.create_column("time", ColumnType::Time).await;
|
||||
table.create_column("tag1", ColumnType::Tag).await;
|
||||
table.create_column("tag2", ColumnType::Tag).await;
|
||||
table.create_column("tag3", ColumnType::Tag).await;
|
||||
table.create_column("time", ColumnType::Time).await;
|
||||
|
||||
let partition = table.create_partition("2022-07-13").await;
|
||||
|
||||
// The sort key comes from the catalog and should be the union of all tags the
|
||||
// ingester has seen
|
||||
let sort_key = SortKey::from_columns(["tag1", "tag2", "tag3", "time"]);
|
||||
let sort_key_col_ids = ColumnSet::from([tag1.id(), tag2.id(), tag3.id(), col_time.id()]);
|
||||
let partition = partition
|
||||
.update_sort_key(sort_key.clone(), &sort_key_col_ids)
|
||||
.await;
|
||||
let partition = partition.update_sort_key(sort_key.clone()).await;
|
||||
|
||||
// Ensure the input scenario conforms to the expected invariants.
|
||||
let invariant_check = Arc::new(CatalogInvariants {
|
||||
|
|
|
@ -90,17 +90,6 @@ impl ColumnsByName {
|
|||
self.0.values().map(|c| c.id)
|
||||
}
|
||||
|
||||
/// Return column ids of the given column names
|
||||
/// Will panic if any of the names are not found
|
||||
pub fn ids_for_names(&self, names: &[&str]) -> ColumnSet {
|
||||
ColumnSet::from(names.iter().map(|name| {
|
||||
self.get(name)
|
||||
.unwrap_or_else(|| panic!("column name not found: {}", name))
|
||||
.id
|
||||
.get()
|
||||
}))
|
||||
}
|
||||
|
||||
/// Get a column by its name.
|
||||
pub fn get(&self, name: &str) -> Option<&ColumnSchema> {
|
||||
self.0.get(name)
|
||||
|
@ -342,7 +331,7 @@ impl TryFrom<proto::column_schema::ColumnType> for ColumnType {
|
|||
}
|
||||
|
||||
/// Set of columns.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default, sqlx::Type)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, sqlx::Type)]
|
||||
#[sqlx(transparent, no_pg_array)]
|
||||
pub struct ColumnSet(Vec<ColumnId>);
|
||||
|
||||
|
@ -382,15 +371,6 @@ impl From<ColumnSet> for Vec<ColumnId> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<I> From<I> for ColumnSet
|
||||
where
|
||||
I: IntoIterator<Item = i64>,
|
||||
{
|
||||
fn from(ids: I) -> Self {
|
||||
Self(ids.into_iter().map(ColumnId::new).collect())
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for ColumnSet {
|
||||
type Target = [ColumnId];
|
||||
|
||||
|
@ -402,7 +382,6 @@ impl Deref for ColumnSet {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use assert_matches::assert_matches;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use super::*;
|
||||
|
||||
|
@ -471,61 +450,4 @@ mod tests {
|
|||
|
||||
ColumnSchema::try_from(&proto).expect_err("should succeed");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_columns_by_names_exist() {
|
||||
let columns = build_columns_by_names();
|
||||
|
||||
let ids = columns.ids_for_names(&["foo", "bar"]);
|
||||
assert_eq!(ids, ColumnSet::from([1, 2]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_columns_by_names_exist_different_order() {
|
||||
let columns = build_columns_by_names();
|
||||
|
||||
let ids = columns.ids_for_names(&["bar", "foo"]);
|
||||
assert_eq!(ids, ColumnSet::from([2, 1]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic = "column name not found: baz"]
|
||||
fn test_columns_by_names_not_exist() {
|
||||
let columns = build_columns_by_names();
|
||||
columns.ids_for_names(&["foo", "baz"]);
|
||||
}
|
||||
|
||||
fn build_columns_by_names() -> ColumnsByName {
|
||||
let mut columns: BTreeMap<String, ColumnSchema> = BTreeMap::new();
|
||||
columns.insert(
|
||||
"foo".to_string(),
|
||||
ColumnSchema {
|
||||
id: ColumnId::new(1),
|
||||
column_type: ColumnType::I64,
|
||||
},
|
||||
);
|
||||
columns.insert(
|
||||
"bar".to_string(),
|
||||
ColumnSchema {
|
||||
id: ColumnId::new(2),
|
||||
column_type: ColumnType::I64,
|
||||
},
|
||||
);
|
||||
columns.insert(
|
||||
"time".to_string(),
|
||||
ColumnSchema {
|
||||
id: ColumnId::new(3),
|
||||
column_type: ColumnType::Time,
|
||||
},
|
||||
);
|
||||
columns.insert(
|
||||
"tag1".to_string(),
|
||||
ColumnSchema {
|
||||
id: ColumnId::new(4),
|
||||
column_type: ColumnType::Tag,
|
||||
},
|
||||
);
|
||||
|
||||
ColumnsByName(columns)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
//! Types having to do with partitions.
|
||||
|
||||
use crate::ColumnSet;
|
||||
|
||||
use super::{TableId, Timestamp};
|
||||
|
||||
use schema::sort::SortKey;
|
||||
|
@ -360,15 +358,9 @@ pub struct Partition {
|
|||
pub table_id: TableId,
|
||||
/// the string key of the partition
|
||||
pub partition_key: PartitionKey,
|
||||
|
||||
// TODO: This `sort_key` will be removed after `sort_key_ids` is fully implemented
|
||||
/// See comments of [`Partition::sort_key_ids`] for this as it plays the
|
||||
/// same role but stores column IDs instead of names
|
||||
pub sort_key: Vec<String>,
|
||||
|
||||
/// vector of column IDs that describes how *every* parquet file
|
||||
/// vector of column names that describes how *every* parquet file
|
||||
/// in this [`Partition`] is sorted. The sort_key contains all the
|
||||
/// ID of primary key (PK) columns that have been persisted, and nothing
|
||||
/// primary key (PK) columns that have been persisted, and nothing
|
||||
/// else. The PK columns are all `tag` columns and the `time`
|
||||
/// column.
|
||||
///
|
||||
|
@ -391,7 +383,7 @@ pub struct Partition {
|
|||
/// For example, updating `A,B,C` to either `A,D,B,C` or `A,B,C,D`
|
||||
/// is legal. However, updating to `A,C,D,B` is not because the
|
||||
/// relative order of B and C have been reversed.
|
||||
pub sort_key_ids: ColumnSet,
|
||||
pub sort_key: Vec<String>,
|
||||
|
||||
/// The time at which the newest file of the partition is created
|
||||
pub new_file_at: Option<Timestamp>,
|
||||
|
@ -407,7 +399,6 @@ impl Partition {
|
|||
table_id: TableId,
|
||||
partition_key: PartitionKey,
|
||||
sort_key: Vec<String>,
|
||||
sort_key_ids: ColumnSet,
|
||||
new_file_at: Option<Timestamp>,
|
||||
) -> Self {
|
||||
let hash_id = PartitionHashId::new(table_id, &partition_key);
|
||||
|
@ -417,7 +408,6 @@ impl Partition {
|
|||
table_id,
|
||||
partition_key,
|
||||
sort_key,
|
||||
sort_key_ids,
|
||||
new_file_at,
|
||||
}
|
||||
}
|
||||
|
@ -434,7 +424,6 @@ impl Partition {
|
|||
table_id: TableId,
|
||||
partition_key: PartitionKey,
|
||||
sort_key: Vec<String>,
|
||||
sort_key_ids: ColumnSet,
|
||||
new_file_at: Option<Timestamp>,
|
||||
) -> Self {
|
||||
Self {
|
||||
|
@ -443,7 +432,6 @@ impl Partition {
|
|||
table_id,
|
||||
partition_key,
|
||||
sort_key,
|
||||
sort_key_ids,
|
||||
new_file_at,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,10 +50,6 @@ message Partition {
|
|||
repeated string array_sort_key = 6;
|
||||
|
||||
PartitionIdentifier identifier = 8;
|
||||
|
||||
// the sort key ids sort_key_ids for data in parquet files of this partition which
|
||||
// is an array of column ids of the sort keys
|
||||
repeated int64 array_sort_key_ids = 9;
|
||||
}
|
||||
|
||||
message GetPartitionsByTableIdRequest {
|
||||
|
|
|
@ -6,9 +6,8 @@ use data_types::{
|
|||
partition_template::{
|
||||
NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, PARTITION_BY_DAY_PROTO,
|
||||
},
|
||||
ColumnSet, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceName,
|
||||
NamespaceNameError, ParquetFileParams, Partition, PartitionKey, Statistics, Table, TableId,
|
||||
Timestamp,
|
||||
ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceName, NamespaceNameError,
|
||||
ParquetFileParams, Partition, Statistics, Table, TableId, Timestamp,
|
||||
};
|
||||
use generated_types::influxdata::iox::catalog::v1 as proto;
|
||||
// ParquetFile as ProtoParquetFile, Partition as ProtoPartition,
|
||||
|
@ -370,10 +369,8 @@ impl RemoteImporter {
|
|||
let table_id = table.id;
|
||||
debug!(%table_id, "Inserting catalog records into table");
|
||||
|
||||
// Create a new partition
|
||||
let partition_key = iox_metadata.partition_key.clone();
|
||||
let mut partition = self
|
||||
.create_partition(repos.as_mut(), &table, partition_key)
|
||||
let partition = self
|
||||
.partition_for_parquet_file(repos.as_mut(), &table, &iox_metadata)
|
||||
.await?;
|
||||
|
||||
// Note that for some reason, the object_store_id that is
|
||||
|
@ -419,11 +416,6 @@ impl RemoteImporter {
|
|||
}
|
||||
};
|
||||
|
||||
// Update partition sort key
|
||||
let partition = self
|
||||
.update_partition(&mut partition, repos.as_mut(), &table, &iox_metadata)
|
||||
.await?;
|
||||
|
||||
// Now copy the parquet files into the object store
|
||||
//let partition_id = TransitionPartitionId::Deprecated(partition.id);
|
||||
let transition_partition_id = partition.transition_partition_id();
|
||||
|
@ -479,40 +471,25 @@ impl RemoteImporter {
|
|||
Ok(table)
|
||||
}
|
||||
|
||||
/// Create the catalog [`Partition`] into which the specified parquet
|
||||
/// Return the catalog [`Partition`] into which the specified parquet
|
||||
/// file shoudl be inserted.
|
||||
///
|
||||
/// The sort_key and sort_key_ids of the partition should be empty when it is first created
|
||||
/// because there are no columns in any parquet files to use for sorting yet.
|
||||
/// The sort_key and sort_key_ids will be updated after the parquet files are created.
|
||||
async fn create_partition(
|
||||
&self,
|
||||
repos: &mut dyn RepoCollection,
|
||||
table: &Table,
|
||||
partition_key: PartitionKey,
|
||||
) -> Result<Partition> {
|
||||
let partition = repos
|
||||
.partitions()
|
||||
.create_or_get(partition_key, table.id)
|
||||
.await?;
|
||||
|
||||
Ok(partition)
|
||||
}
|
||||
|
||||
/// Update sort keys of the partition
|
||||
///
|
||||
/// First attempts to use any available metadata from the
|
||||
/// catalog export, and falls back to what is in the iox
|
||||
/// metadata stored in the parquet file, if needed
|
||||
async fn update_partition(
|
||||
async fn partition_for_parquet_file(
|
||||
&self,
|
||||
partition: &mut Partition,
|
||||
repos: &mut dyn RepoCollection,
|
||||
table: &Table,
|
||||
iox_metadata: &IoxMetadata,
|
||||
) -> Result<Partition> {
|
||||
let partition_key = iox_metadata.partition_key.clone();
|
||||
|
||||
let partition = repos
|
||||
.partitions()
|
||||
.create_or_get(partition_key.clone(), table.id)
|
||||
.await?;
|
||||
|
||||
// Note we use the table_id embedded in the file's metadata
|
||||
// from the source catalog to match the exported catlog (which
|
||||
// is dfferent than the new table we just created in the
|
||||
|
@ -521,21 +498,14 @@ impl RemoteImporter {
|
|||
.exported_contents
|
||||
.partition_metadata(iox_metadata.table_id.get(), partition_key.inner());
|
||||
|
||||
let (new_sort_key, new_sort_key_ids) = if let Some(proto_partition) =
|
||||
proto_partition.as_ref()
|
||||
{
|
||||
let new_sort_key: Vec<&str> = if let Some(proto_partition) = proto_partition.as_ref() {
|
||||
// Use the sort key from the source catalog
|
||||
debug!(array_sort_key=?proto_partition.array_sort_key, "Using sort key from catalog export");
|
||||
let new_sort_key = proto_partition
|
||||
proto_partition
|
||||
.array_sort_key
|
||||
.iter()
|
||||
.map(|s| s.as_str())
|
||||
.collect::<Vec<&str>>();
|
||||
|
||||
let new_sort_key_ids =
|
||||
ColumnSet::from(proto_partition.array_sort_key_ids.iter().cloned());
|
||||
|
||||
(new_sort_key, new_sort_key_ids)
|
||||
.collect()
|
||||
} else {
|
||||
warn!("Could not find sort key in catalog metadata export, falling back to embedded metadata");
|
||||
let sort_key = iox_metadata
|
||||
|
@ -543,13 +513,7 @@ impl RemoteImporter {
|
|||
.as_ref()
|
||||
.ok_or_else(|| Error::NoSortKey)?;
|
||||
|
||||
let new_sort_key = sort_key.to_columns().collect::<Vec<_>>();
|
||||
|
||||
// fecth table columns
|
||||
let columns = ColumnsByName::new(repos.columns().list_by_table_id(table.id).await?);
|
||||
let new_sort_key_ids = columns.ids_for_names(&new_sort_key);
|
||||
|
||||
(new_sort_key, new_sort_key_ids)
|
||||
sort_key.to_columns().collect()
|
||||
};
|
||||
|
||||
if !partition.sort_key.is_empty() && partition.sort_key != new_sort_key {
|
||||
|
@ -565,7 +529,6 @@ impl RemoteImporter {
|
|||
&partition.transition_partition_id(),
|
||||
Some(partition.sort_key.clone()),
|
||||
&new_sort_key,
|
||||
&new_sort_key_ids,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
|
|
@ -423,7 +423,6 @@ mod tests {
|
|||
use arrow_util::assert_batches_eq;
|
||||
use assert_matches::assert_matches;
|
||||
use backoff::BackoffConfig;
|
||||
use data_types::ColumnSet;
|
||||
use datafusion::{
|
||||
physical_expr::PhysicalSortExpr,
|
||||
physical_plan::{expressions::col, memory::MemoryExec, ExecutionPlan},
|
||||
|
@ -1007,12 +1006,7 @@ mod tests {
|
|||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.cas_sort_key(
|
||||
&partition.transition_partition_id(),
|
||||
None,
|
||||
&["terrific"],
|
||||
&ColumnSet::from([1]),
|
||||
)
|
||||
.cas_sort_key(&partition.transition_partition_id(), None, &["terrific"])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -215,7 +215,6 @@ mod tests {
|
|||
// Harmless in tests - saves a bunch of extra vars.
|
||||
#![allow(clippy::await_holding_lock)]
|
||||
|
||||
use data_types::ColumnSet;
|
||||
use iox_catalog::mem::MemCatalog;
|
||||
|
||||
use super::*;
|
||||
|
@ -288,7 +287,6 @@ mod tests {
|
|||
ARBITRARY_TABLE_ID,
|
||||
stored_partition_key.clone(),
|
||||
vec!["dos".to_string(), "bananas".to_string()],
|
||||
ColumnSet::from([1, 2]),
|
||||
Default::default(),
|
||||
);
|
||||
|
||||
|
@ -351,7 +349,6 @@ mod tests {
|
|||
ARBITRARY_PARTITION_KEY.clone(),
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
);
|
||||
|
||||
let cache = new_cache(inner, [partition]);
|
||||
|
@ -389,7 +386,6 @@ mod tests {
|
|||
ARBITRARY_PARTITION_KEY.clone(),
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
);
|
||||
|
||||
let cache = new_cache(inner, [partition]);
|
||||
|
|
|
@ -304,7 +304,6 @@ mod tests {
|
|||
table_id,
|
||||
partition_key,
|
||||
vec![],
|
||||
Default::default(),
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
@ -388,7 +387,6 @@ mod tests {
|
|||
ARBITRARY_TABLE_ID,
|
||||
ARBITRARY_PARTITION_KEY.clone(),
|
||||
vec![],
|
||||
Default::default(),
|
||||
None,
|
||||
);
|
||||
let want_id = p.transition_partition_id().clone();
|
||||
|
|
|
@ -53,7 +53,6 @@ impl SortKeyResolver {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use data_types::ColumnSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
|
@ -96,7 +95,6 @@ mod tests {
|
|||
&partition.transition_partition_id(),
|
||||
None,
|
||||
&["uno", "dos", "bananas"],
|
||||
&ColumnSet::from([1, 2, 3]),
|
||||
)
|
||||
.await
|
||||
.expect("should update existing partition key");
|
||||
|
|
|
@ -16,7 +16,7 @@ mod tests {
|
|||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use data_types::{ColumnSet, CompactionLevel, ParquetFile};
|
||||
use data_types::{CompactionLevel, ParquetFile};
|
||||
use futures::TryStreamExt;
|
||||
use iox_catalog::{
|
||||
interface::{get_schema_by_id, Catalog, SoftDeletedRows},
|
||||
|
@ -345,10 +345,7 @@ mod tests {
|
|||
.cas_sort_key(
|
||||
&partition_id,
|
||||
None,
|
||||
// must use column names that exist in the partition data
|
||||
&["region"],
|
||||
// column id of region
|
||||
&ColumnSet::from([2]),
|
||||
&["bananas", "are", "good", "for", "you"],
|
||||
)
|
||||
.await
|
||||
.expect("failed to set catalog sort key");
|
||||
|
@ -383,11 +380,10 @@ mod tests {
|
|||
// mark_persisted() was called.
|
||||
assert_eq!(partition.lock().completed_persistence_count(), 1);
|
||||
|
||||
// Assert the sort key was also updated, adding the new columns (time) to the
|
||||
// Assert the sort key was also updated, adding the new columns to the
|
||||
// end of the concurrently updated catalog sort key.
|
||||
assert_matches!(partition.lock().sort_key(), SortKeyState::Provided(Some(p)) => {
|
||||
// Before there is only ["region"] (manual sort key update above). Now ["region", "time"]
|
||||
assert_eq!(p.to_columns().collect::<Vec<_>>(), &["region", "time"]);
|
||||
assert_eq!(p.to_columns().collect::<Vec<_>>(), &["bananas", "are", "good", "for", "you", "region", "time"]);
|
||||
});
|
||||
|
||||
// Ensure a file was made visible in the catalog
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::{ops::ControlFlow, sync::Arc};
|
|||
|
||||
use async_channel::RecvError;
|
||||
use backoff::Backoff;
|
||||
use data_types::{ColumnsByName, CompactionLevel, ParquetFileParams};
|
||||
use data_types::{CompactionLevel, ParquetFileParams};
|
||||
use iox_catalog::interface::{get_table_columns_by_id, CasFailure, Catalog};
|
||||
use iox_query::exec::Executor;
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
|
@ -174,15 +174,8 @@ async fn compact_and_upload<O>(
|
|||
where
|
||||
O: Send + Sync,
|
||||
{
|
||||
// load sort key
|
||||
let sort_key = ctx.sort_key().get().await;
|
||||
// fetch column map
|
||||
// THIS MUST BE DONE AFTER THE SORT KEY IS LOADED
|
||||
let columns = fetch_column_map(ctx, worker_state, sort_key.as_ref()).await?;
|
||||
|
||||
let compacted = compact(ctx, worker_state, sort_key).await;
|
||||
let (sort_key_update, parquet_table_data) =
|
||||
upload(ctx, worker_state, compacted, &columns).await;
|
||||
let compacted = compact(ctx, worker_state).await;
|
||||
let (sort_key_update, parquet_table_data) = upload(ctx, worker_state, compacted).await;
|
||||
|
||||
if let Some(update) = sort_key_update {
|
||||
update_catalog_sort_key(
|
||||
|
@ -190,7 +183,6 @@ where
|
|||
worker_state,
|
||||
update,
|
||||
parquet_table_data.object_store_id,
|
||||
&columns,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
|
@ -200,14 +192,12 @@ where
|
|||
|
||||
/// Compact the data in `ctx` using sorted by the sort key returned from
|
||||
/// [`Context::sort_key()`].
|
||||
async fn compact<O>(
|
||||
ctx: &Context,
|
||||
worker_state: &SharedWorkerState<O>,
|
||||
sort_key: Option<SortKey>,
|
||||
) -> CompactedStream
|
||||
async fn compact<O>(ctx: &Context, worker_state: &SharedWorkerState<O>) -> CompactedStream
|
||||
where
|
||||
O: Send + Sync,
|
||||
{
|
||||
let sort_key = ctx.sort_key().get().await;
|
||||
|
||||
debug!(
|
||||
namespace_id = %ctx.namespace_id(),
|
||||
namespace_name = %ctx.namespace_name(),
|
||||
|
@ -241,7 +231,6 @@ async fn upload<O>(
|
|||
ctx: &Context,
|
||||
worker_state: &SharedWorkerState<O>,
|
||||
compacted: CompactedStream,
|
||||
columns: &ColumnsByName,
|
||||
) -> (Option<SortKey>, ParquetFileParams)
|
||||
where
|
||||
O: Send + Sync,
|
||||
|
@ -305,6 +294,15 @@ where
|
|||
"partition parquet uploaded"
|
||||
);
|
||||
|
||||
// Read the table's columns from the catalog to get a map of column name -> column IDs.
|
||||
let columns = Backoff::new(&Default::default())
|
||||
.retry_all_errors("get table schema", || async {
|
||||
let mut repos = worker_state.catalog.repositories().await;
|
||||
get_table_columns_by_id(ctx.table_id(), repos.as_mut()).await
|
||||
})
|
||||
.await
|
||||
.expect("retry forever");
|
||||
|
||||
// Build the data that must be inserted into the parquet_files catalog
|
||||
// table in order to make the file visible to queriers.
|
||||
let parquet_table_data =
|
||||
|
@ -323,44 +321,6 @@ where
|
|||
(catalog_sort_key_update, parquet_table_data)
|
||||
}
|
||||
|
||||
/// Fetch the table column map from the catalog and verify if they contain all columns in the sort key
|
||||
async fn fetch_column_map<O>(
|
||||
ctx: &Context,
|
||||
worker_state: &SharedWorkerState<O>,
|
||||
// NOTE: CALLER MUST LOAD SORT KEY BEFORE CALLING THIS FUNCTION EVEN IF THE sort key IS NONE
|
||||
// THIS IS A MUST TO GUARANTEE THE RETURNED COLUMN MAP CONTAINS ALL COLUMNS IN THE SORT KEY
|
||||
// The purpose to put the sort_key as a param here is to make sure the caller has already loaded the sort key
|
||||
sort_key: Option<&SortKey>,
|
||||
) -> Result<ColumnsByName, PersistError>
|
||||
where
|
||||
O: Send + Sync,
|
||||
{
|
||||
// Read the table's columns from the catalog to get a map of column name -> column IDs.
|
||||
let column_map = Backoff::new(&Default::default())
|
||||
.retry_all_errors("get table schema", || async {
|
||||
let mut repos = worker_state.catalog.repositories().await;
|
||||
get_table_columns_by_id(ctx.table_id(), repos.as_mut()).await
|
||||
})
|
||||
.await
|
||||
.expect("retry forever");
|
||||
|
||||
// Verify that the sort key columns are in the column map
|
||||
if let Some(sort_key) = sort_key {
|
||||
for sort_key_column in sort_key.to_columns() {
|
||||
if !column_map.contains_column_name(sort_key_column) {
|
||||
panic!(
|
||||
"sort key column {} of partition id {} is not in the column map {:?}",
|
||||
sort_key_column,
|
||||
ctx.partition_id(),
|
||||
column_map
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(column_map)
|
||||
}
|
||||
|
||||
/// Update the sort key value stored in the catalog for this [`Context`].
|
||||
///
|
||||
/// # Concurrent Updates
|
||||
|
@ -373,7 +333,6 @@ async fn update_catalog_sort_key<O>(
|
|||
worker_state: &SharedWorkerState<O>,
|
||||
new_sort_key: SortKey,
|
||||
object_store_id: Uuid,
|
||||
columns: &ColumnsByName,
|
||||
) -> Result<(), PersistError>
|
||||
where
|
||||
O: Send + Sync,
|
||||
|
@ -401,19 +360,13 @@ where
|
|||
.retry_with_backoff("cas_sort_key", || {
|
||||
let old_sort_key = old_sort_key.clone();
|
||||
let new_sort_key_str = new_sort_key.to_columns().collect::<Vec<_>>();
|
||||
let new_sort_key_colids = columns.ids_for_names(&new_sort_key_str);
|
||||
let catalog = Arc::clone(&worker_state.catalog);
|
||||
let ctx = &ctx;
|
||||
async move {
|
||||
let mut repos = catalog.repositories().await;
|
||||
match repos
|
||||
.partitions()
|
||||
.cas_sort_key(
|
||||
ctx.partition_id(),
|
||||
old_sort_key.clone(),
|
||||
&new_sort_key_str,
|
||||
&new_sort_key_colids,
|
||||
)
|
||||
.cas_sort_key(ctx.partition_id(), old_sort_key.clone(), &new_sort_key_str)
|
||||
.await
|
||||
{
|
||||
Ok(_) => ControlFlow::Break(Ok(())),
|
||||
|
@ -436,8 +389,7 @@ where
|
|||
partition_key = %ctx.partition_key(),
|
||||
expected=?old_sort_key,
|
||||
?observed,
|
||||
update_sort_key=?new_sort_key_str,
|
||||
update_sort_key_ids=?new_sort_key_colids,
|
||||
update=?new_sort_key_str,
|
||||
"detected matching concurrent sort key update"
|
||||
);
|
||||
ControlFlow::Break(Ok(()))
|
||||
|
@ -463,8 +415,7 @@ where
|
|||
partition_key = %ctx.partition_key(),
|
||||
expected=?old_sort_key,
|
||||
?observed,
|
||||
update_sort_key=?new_sort_key_str,
|
||||
update_sort_key_ids=?new_sort_key_colids,
|
||||
update=?new_sort_key_str,
|
||||
"detected concurrent sort key update, regenerating parquet"
|
||||
);
|
||||
// Stop the retry loop with an error containing the
|
||||
|
|
|
@ -3,10 +3,10 @@
|
|||
use async_trait::async_trait;
|
||||
use data_types::{
|
||||
partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride},
|
||||
Column, ColumnSet, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceId,
|
||||
NamespaceName, NamespaceSchema, NamespaceServiceProtectionLimitsOverride, ParquetFile,
|
||||
ParquetFileId, ParquetFileParams, Partition, PartitionHashId, PartitionId, PartitionKey,
|
||||
SkippedCompaction, Table, TableId, TableSchema, Timestamp, TransitionPartitionId,
|
||||
Column, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceId, NamespaceName,
|
||||
NamespaceSchema, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId,
|
||||
ParquetFileParams, Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction,
|
||||
Table, TableId, TableSchema, Timestamp, TransitionPartitionId,
|
||||
};
|
||||
use iox_time::TimeProvider;
|
||||
use snafu::{OptionExt, Snafu};
|
||||
|
@ -413,7 +413,6 @@ pub trait PartitionRepo: Send + Sync {
|
|||
partition_id: &TransitionPartitionId,
|
||||
old_sort_key: Option<Vec<String>>,
|
||||
new_sort_key: &[&str],
|
||||
new_sort_key_ids: &ColumnSet,
|
||||
) -> Result<Partition, CasFailure<Vec<String>>>;
|
||||
|
||||
/// Record an instance of a partition being selected for compaction but compaction was not
|
||||
|
@ -1599,24 +1598,19 @@ pub(crate) mod test_helpers {
|
|||
|
||||
assert_eq!(created.keys().copied().collect::<BTreeSet<_>>(), listed);
|
||||
|
||||
// sort_key and sort_key_ids should be empty on creation
|
||||
// sort_key should be empty on creation
|
||||
assert!(to_skip_partition.sort_key.is_empty());
|
||||
assert!(to_skip_partition.sort_key_ids.is_empty());
|
||||
|
||||
// test update_sort_key from None to Some
|
||||
let partition = repos
|
||||
repos
|
||||
.partitions()
|
||||
.cas_sort_key(
|
||||
&to_skip_partition.transition_partition_id(),
|
||||
None,
|
||||
&["tag2", "tag1", "time"],
|
||||
&ColumnSet::from([1, 2, 3]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
// verify sort key and sort key ids are updated
|
||||
assert_eq!(partition.sort_key, &["tag2", "tag1", "time"]);
|
||||
assert_eq!(partition.sort_key_ids, ColumnSet::from([1, 2, 3]));
|
||||
|
||||
// test sort key CAS with an incorrect value
|
||||
let err = repos
|
||||
|
@ -1625,7 +1619,6 @@ pub(crate) mod test_helpers {
|
|||
&to_skip_partition.transition_partition_id(),
|
||||
Some(["bananas".to_string()].to_vec()),
|
||||
&["tag2", "tag1", "tag3 , with comma", "time"],
|
||||
&ColumnSet::from([1, 2, 3, 4]),
|
||||
)
|
||||
.await
|
||||
.expect_err("CAS with incorrect value should fail");
|
||||
|
@ -1662,7 +1655,6 @@ pub(crate) mod test_helpers {
|
|||
&to_skip_partition.transition_partition_id(),
|
||||
None,
|
||||
&["tag2", "tag1", "tag3 , with comma", "time"],
|
||||
&ColumnSet::from([1, 2, 3, 4]),
|
||||
)
|
||||
.await
|
||||
.expect_err("CAS with incorrect value should fail");
|
||||
|
@ -1677,7 +1669,6 @@ pub(crate) mod test_helpers {
|
|||
&to_skip_partition.transition_partition_id(),
|
||||
Some(["bananas".to_string()].to_vec()),
|
||||
&["tag2", "tag1", "tag3 , with comma", "time"],
|
||||
&ColumnSet::from([1, 2, 3, 4]),
|
||||
)
|
||||
.await
|
||||
.expect_err("CAS with incorrect value should fail");
|
||||
|
@ -1697,7 +1688,6 @@ pub(crate) mod test_helpers {
|
|||
.collect(),
|
||||
),
|
||||
&["tag2", "tag1", "tag3 , with comma", "time"],
|
||||
&ColumnSet::from([1, 2, 3, 4]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
|
@ -16,10 +16,10 @@ use data_types::{
|
|||
partition_template::{
|
||||
NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, TemplatePart,
|
||||
},
|
||||
Column, ColumnId, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId,
|
||||
NamespaceName, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId,
|
||||
ParquetFileParams, Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction,
|
||||
Table, TableId, Timestamp, TransitionPartitionId,
|
||||
Column, ColumnId, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName,
|
||||
NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams,
|
||||
Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
|
||||
Timestamp, TransitionPartitionId,
|
||||
};
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
use snafu::ensure;
|
||||
|
@ -566,7 +566,6 @@ impl PartitionRepo for MemTxn {
|
|||
table_id,
|
||||
key,
|
||||
vec![],
|
||||
Default::default(),
|
||||
None,
|
||||
);
|
||||
stage.partitions.push(p);
|
||||
|
@ -662,15 +661,7 @@ impl PartitionRepo for MemTxn {
|
|||
partition_id: &TransitionPartitionId,
|
||||
old_sort_key: Option<Vec<String>>,
|
||||
new_sort_key: &[&str],
|
||||
new_sort_key_ids: &ColumnSet,
|
||||
) -> Result<Partition, CasFailure<Vec<String>>> {
|
||||
// panic if new_sort_key and new_sort_key_ids have different lengths
|
||||
assert_eq!(
|
||||
new_sort_key.len(),
|
||||
new_sort_key_ids.len(),
|
||||
"new_sort_key and new_sort_key_ids must be the same length"
|
||||
);
|
||||
|
||||
let stage = self.stage();
|
||||
let old_sort_key = old_sort_key.unwrap_or_default();
|
||||
|
||||
|
@ -682,7 +673,6 @@ impl PartitionRepo for MemTxn {
|
|||
}) {
|
||||
Some(p) if p.sort_key == old_sort_key => {
|
||||
p.sort_key = new_sort_key.iter().map(|s| s.to_string()).collect();
|
||||
p.sort_key_ids = new_sort_key_ids.clone();
|
||||
Ok(p.clone())
|
||||
}
|
||||
Some(p) => return Err(CasFailure::ValueMismatch(p.sort_key.clone())),
|
||||
|
|
|
@ -7,7 +7,7 @@ use crate::interface::{
|
|||
use async_trait::async_trait;
|
||||
use data_types::{
|
||||
partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride},
|
||||
Column, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName,
|
||||
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName,
|
||||
NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams,
|
||||
Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
|
||||
Timestamp, TransitionPartitionId,
|
||||
|
@ -176,7 +176,7 @@ decorate!(
|
|||
"partition_get_by_hash_id_batch" = get_by_hash_id_batch(&mut self, partition_hash_ids: &[&PartitionHashId]) -> Result<Vec<Partition>>;
|
||||
"partition_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>>;
|
||||
"partition_list_ids" = list_ids(&mut self) -> Result<Vec<PartitionId>>;
|
||||
"partition_update_sort_key" = cas_sort_key(&mut self, partition_id: &TransitionPartitionId, old_sort_key: Option<Vec<String>>, new_sort_key: &[&str], new_sort_key_ids: &ColumnSet) -> Result<Partition, CasFailure<Vec<String>>>;
|
||||
"partition_update_sort_key" = cas_sort_key(&mut self, partition_id: &TransitionPartitionId, old_sort_key: Option<Vec<String>>, new_sort_key: &[&str]) -> Result<Partition, CasFailure<Vec<String>>>;
|
||||
"partition_record_skipped_compaction" = record_skipped_compaction(&mut self, partition_id: PartitionId, reason: &str, num_files: usize, limit_num_files: usize, limit_num_files_first_in_partition: usize, estimated_bytes: u64, limit_bytes: u64) -> Result<()>;
|
||||
"partition_list_skipped_compactions" = list_skipped_compactions(&mut self) -> Result<Vec<SkippedCompaction>>;
|
||||
"partition_delete_skipped_compactions" = delete_skipped_compactions(&mut self, partition_id: PartitionId) -> Result<Option<SkippedCompaction>>;
|
||||
|
|
|
@ -20,7 +20,7 @@ use data_types::{
|
|||
partition_template::{
|
||||
NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, TemplatePart,
|
||||
},
|
||||
Column, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName,
|
||||
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName,
|
||||
NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams,
|
||||
Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
|
||||
Timestamp, TransitionPartitionId,
|
||||
|
@ -1163,12 +1163,12 @@ impl PartitionRepo for PostgresTxn {
|
|||
let v = sqlx::query_as::<_, Partition>(
|
||||
r#"
|
||||
INSERT INTO partition
|
||||
(partition_key, shard_id, table_id, hash_id, sort_key, sort_key_ids)
|
||||
(partition_key, shard_id, table_id, hash_id, sort_key)
|
||||
VALUES
|
||||
( $1, $2, $3, $4, '{}', '{}')
|
||||
( $1, $2, $3, $4, '{}')
|
||||
ON CONFLICT ON CONSTRAINT partition_key_unique
|
||||
DO UPDATE SET partition_key = partition.partition_key
|
||||
RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at;
|
||||
RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at;
|
||||
"#,
|
||||
)
|
||||
.bind(key) // $1
|
||||
|
@ -1191,7 +1191,7 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file
|
|||
async fn get_by_id(&mut self, partition_id: PartitionId) -> Result<Option<Partition>> {
|
||||
let rec = sqlx::query_as::<_, Partition>(
|
||||
r#"
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at
|
||||
FROM partition
|
||||
WHERE id = $1;
|
||||
"#,
|
||||
|
@ -1214,7 +1214,7 @@ WHERE id = $1;
|
|||
|
||||
sqlx::query_as::<_, Partition>(
|
||||
r#"
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at
|
||||
FROM partition
|
||||
WHERE id = ANY($1);
|
||||
"#,
|
||||
|
@ -1231,7 +1231,7 @@ WHERE id = ANY($1);
|
|||
) -> Result<Option<Partition>> {
|
||||
let rec = sqlx::query_as::<_, Partition>(
|
||||
r#"
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at
|
||||
FROM partition
|
||||
WHERE hash_id = $1;
|
||||
"#,
|
||||
|
@ -1257,7 +1257,7 @@ WHERE hash_id = $1;
|
|||
|
||||
sqlx::query_as::<_, Partition>(
|
||||
r#"
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at
|
||||
FROM partition
|
||||
WHERE hash_id = ANY($1);
|
||||
"#,
|
||||
|
@ -1271,7 +1271,7 @@ WHERE hash_id = ANY($1);
|
|||
async fn list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>> {
|
||||
sqlx::query_as::<_, Partition>(
|
||||
r#"
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at
|
||||
FROM partition
|
||||
WHERE table_id = $1;
|
||||
"#,
|
||||
|
@ -1305,42 +1305,32 @@ WHERE table_id = $1;
|
|||
partition_id: &TransitionPartitionId,
|
||||
old_sort_key: Option<Vec<String>>,
|
||||
new_sort_key: &[&str],
|
||||
new_sort_key_ids: &ColumnSet,
|
||||
) -> Result<Partition, CasFailure<Vec<String>>> {
|
||||
// panic if new_sort_key and new_sort_key_ids have different lengths
|
||||
assert_eq!(
|
||||
new_sort_key.len(),
|
||||
new_sort_key_ids.len(),
|
||||
"new_sort_key and new_sort_key_ids must be the same length"
|
||||
);
|
||||
|
||||
let old_sort_key = old_sort_key.unwrap_or_default();
|
||||
// This `match` will go away when all partitions have hash IDs in the database.
|
||||
let query = match partition_id {
|
||||
TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, Partition>(
|
||||
r#"
|
||||
UPDATE partition
|
||||
SET sort_key = $1, sort_key_ids = $4
|
||||
SET sort_key = $1
|
||||
WHERE hash_id = $2 AND sort_key = $3
|
||||
RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at;
|
||||
RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at;
|
||||
"#,
|
||||
)
|
||||
.bind(new_sort_key) // $1
|
||||
.bind(hash_id) // $2
|
||||
.bind(&old_sort_key) // $3
|
||||
.bind(new_sort_key_ids), // $4
|
||||
.bind(&old_sort_key), // $3
|
||||
TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, Partition>(
|
||||
r#"
|
||||
UPDATE partition
|
||||
SET sort_key = $1, sort_key_ids = $4
|
||||
SET sort_key = $1
|
||||
WHERE id = $2 AND sort_key = $3
|
||||
RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at;
|
||||
RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at;
|
||||
"#,
|
||||
)
|
||||
.bind(new_sort_key) // $1
|
||||
.bind(id) // $2
|
||||
.bind(&old_sort_key) // $3
|
||||
.bind(new_sort_key_ids), // $4
|
||||
.bind(&old_sort_key), // $3
|
||||
};
|
||||
|
||||
let res = query.fetch_one(&mut self.inner).await;
|
||||
|
@ -1376,7 +1366,6 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file
|
|||
?partition_id,
|
||||
?old_sort_key,
|
||||
?new_sort_key,
|
||||
?new_sort_key_ids,
|
||||
"partition sort key cas successful"
|
||||
);
|
||||
|
||||
|
@ -1475,7 +1464,7 @@ RETURNING *
|
|||
async fn most_recent_n(&mut self, n: usize) -> Result<Vec<Partition>> {
|
||||
sqlx::query_as(
|
||||
r#"
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, persisted_sequence_number, new_file_at
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, persisted_sequence_number, new_file_at
|
||||
FROM partition
|
||||
ORDER BY id DESC
|
||||
LIMIT $1;"#,
|
||||
|
@ -2205,12 +2194,12 @@ mod tests {
|
|||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO partition
|
||||
(partition_key, shard_id, table_id, sort_key, sort_key_ids)
|
||||
(partition_key, shard_id, table_id, sort_key)
|
||||
VALUES
|
||||
( $1, $2, $3, '{}', '{}')
|
||||
( $1, $2, $3, '{}')
|
||||
ON CONFLICT ON CONSTRAINT partition_key_unique
|
||||
DO UPDATE SET partition_key = partition.partition_key
|
||||
RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at;
|
||||
RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at;
|
||||
"#,
|
||||
)
|
||||
.bind(&key) // $1
|
||||
|
|
|
@ -818,7 +818,6 @@ struct PartitionPod {
|
|||
table_id: TableId,
|
||||
partition_key: PartitionKey,
|
||||
sort_key: Json<Vec<String>>,
|
||||
sort_key_ids: Json<Vec<i64>>,
|
||||
new_file_at: Option<Timestamp>,
|
||||
}
|
||||
|
||||
|
@ -830,7 +829,6 @@ impl From<PartitionPod> for Partition {
|
|||
value.table_id,
|
||||
value.partition_key,
|
||||
value.sort_key.0,
|
||||
ColumnSet::from(value.sort_key_ids.0),
|
||||
value.new_file_at,
|
||||
)
|
||||
}
|
||||
|
@ -848,12 +846,12 @@ impl PartitionRepo for SqliteTxn {
|
|||
let v = sqlx::query_as::<_, PartitionPod>(
|
||||
r#"
|
||||
INSERT INTO partition
|
||||
(partition_key, shard_id, table_id, hash_id, sort_key, sort_key_ids)
|
||||
(partition_key, shard_id, table_id, hash_id, sort_key)
|
||||
VALUES
|
||||
($1, $2, $3, $4, '[]', '[]')
|
||||
($1, $2, $3, $4, '[]')
|
||||
ON CONFLICT (table_id, partition_key)
|
||||
DO UPDATE SET partition_key = partition.partition_key
|
||||
RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at;
|
||||
RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at;
|
||||
"#,
|
||||
)
|
||||
.bind(key) // $1
|
||||
|
@ -876,7 +874,7 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file
|
|||
async fn get_by_id(&mut self, partition_id: PartitionId) -> Result<Option<Partition>> {
|
||||
let rec = sqlx::query_as::<_, PartitionPod>(
|
||||
r#"
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at
|
||||
FROM partition
|
||||
WHERE id = $1;
|
||||
"#,
|
||||
|
@ -900,7 +898,7 @@ WHERE id = $1;
|
|||
|
||||
sqlx::query_as::<_, PartitionPod>(
|
||||
r#"
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at
|
||||
FROM partition
|
||||
WHERE id IN (SELECT value FROM json_each($1));
|
||||
"#,
|
||||
|
@ -918,7 +916,7 @@ WHERE id IN (SELECT value FROM json_each($1));
|
|||
) -> Result<Option<Partition>> {
|
||||
let rec = sqlx::query_as::<_, PartitionPod>(
|
||||
r#"
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at
|
||||
FROM partition
|
||||
WHERE hash_id = $1;
|
||||
"#,
|
||||
|
@ -956,7 +954,7 @@ WHERE hash_id = $1;
|
|||
|
||||
sqlx::query_as::<_, PartitionPod>(
|
||||
r#"
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at
|
||||
FROM partition
|
||||
WHERE hex(hash_id) IN (SELECT value FROM json_each($1));
|
||||
"#,
|
||||
|
@ -971,7 +969,7 @@ WHERE hex(hash_id) IN (SELECT value FROM json_each($1));
|
|||
async fn list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>> {
|
||||
Ok(sqlx::query_as::<_, PartitionPod>(
|
||||
r#"
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at
|
||||
FROM partition
|
||||
WHERE table_id = $1;
|
||||
"#,
|
||||
|
@ -1008,44 +1006,33 @@ WHERE table_id = $1;
|
|||
partition_id: &TransitionPartitionId,
|
||||
old_sort_key: Option<Vec<String>>,
|
||||
new_sort_key: &[&str],
|
||||
new_sort_key_ids: &ColumnSet,
|
||||
) -> Result<Partition, CasFailure<Vec<String>>> {
|
||||
// panic if new_sort_key and new_sort_key_ids have different lengths
|
||||
assert_eq!(
|
||||
new_sort_key.len(),
|
||||
new_sort_key_ids.len(),
|
||||
"new_sort_key and new_sort_key_ids must be the same length"
|
||||
);
|
||||
|
||||
let old_sort_key = old_sort_key.unwrap_or_default();
|
||||
let raw_new_sort_key_ids: Vec<_> = new_sort_key_ids.iter().map(|cid| cid.get()).collect();
|
||||
|
||||
// This `match` will go away when all partitions have hash IDs in the database.
|
||||
let query = match partition_id {
|
||||
TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, PartitionPod>(
|
||||
r#"
|
||||
UPDATE partition
|
||||
SET sort_key = $1, sort_key_ids = $4
|
||||
SET sort_key = $1
|
||||
WHERE hash_id = $2 AND sort_key = $3
|
||||
RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at;
|
||||
RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at;
|
||||
"#,
|
||||
)
|
||||
.bind(Json(new_sort_key)) // $1
|
||||
.bind(hash_id) // $2
|
||||
.bind(Json(&old_sort_key)) // $3
|
||||
.bind(Json(&raw_new_sort_key_ids)), // $4
|
||||
.bind(Json(&old_sort_key)), // $3
|
||||
TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, PartitionPod>(
|
||||
r#"
|
||||
UPDATE partition
|
||||
SET sort_key = $1, sort_key_ids = $4
|
||||
SET sort_key = $1
|
||||
WHERE id = $2 AND sort_key = $3
|
||||
RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at;
|
||||
RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at;
|
||||
"#,
|
||||
)
|
||||
.bind(Json(new_sort_key)) // $1
|
||||
.bind(id) // $2
|
||||
.bind(Json(&old_sort_key)) // $3
|
||||
.bind(Json(&raw_new_sort_key_ids)), // $4
|
||||
.bind(Json(&old_sort_key)), // $3
|
||||
};
|
||||
|
||||
let res = query.fetch_one(self.inner.get_mut()).await;
|
||||
|
@ -1081,7 +1068,6 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file
|
|||
?partition_id,
|
||||
?old_sort_key,
|
||||
?new_sort_key,
|
||||
?new_sort_key_ids,
|
||||
"partition sort key cas successful"
|
||||
);
|
||||
|
||||
|
@ -1178,7 +1164,7 @@ RETURNING *
|
|||
async fn most_recent_n(&mut self, n: usize) -> Result<Vec<Partition>> {
|
||||
Ok(sqlx::query_as::<_, PartitionPod>(
|
||||
r#"
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at
|
||||
SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at
|
||||
FROM partition
|
||||
ORDER BY id DESC
|
||||
LIMIT $1;
|
||||
|
@ -1789,12 +1775,12 @@ mod tests {
|
|||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO partition
|
||||
(partition_key, shard_id, table_id, sort_key, sort_key_ids)
|
||||
(partition_key, shard_id, table_id, sort_key)
|
||||
VALUES
|
||||
($1, $2, $3, '[]', '[]')
|
||||
($1, $2, $3, '[]')
|
||||
ON CONFLICT (table_id, partition_key)
|
||||
DO UPDATE SET partition_key = partition.partition_key
|
||||
RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at;
|
||||
RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at;
|
||||
"#,
|
||||
)
|
||||
.bind(&key) // $1
|
||||
|
|
|
@ -161,7 +161,6 @@ impl PartitionBuilder {
|
|||
TableId::new(0),
|
||||
PartitionKey::from("key"),
|
||||
vec![],
|
||||
Default::default(),
|
||||
None,
|
||||
),
|
||||
}
|
||||
|
|
|
@ -318,7 +318,6 @@ impl TestTable {
|
|||
self: &Arc<Self>,
|
||||
key: &str,
|
||||
sort_key: &[&str],
|
||||
sort_key_ids: &[i64],
|
||||
) -> Arc<TestPartition> {
|
||||
let mut repos = self.catalog.catalog.repositories().await;
|
||||
|
||||
|
@ -334,7 +333,6 @@ impl TestTable {
|
|||
&TransitionPartitionId::Deprecated(partition.id),
|
||||
None,
|
||||
sort_key,
|
||||
&ColumnSet::from(sort_key_ids.iter().cloned()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -427,12 +425,6 @@ pub struct TestColumn {
|
|||
pub column: Column,
|
||||
}
|
||||
|
||||
impl TestColumn {
|
||||
pub fn id(&self) -> i64 {
|
||||
self.column.id.get()
|
||||
}
|
||||
}
|
||||
|
||||
/// A test catalog with specified namespace, table, partition
|
||||
#[allow(missing_docs)]
|
||||
#[derive(Debug)]
|
||||
|
@ -445,11 +437,7 @@ pub struct TestPartition {
|
|||
|
||||
impl TestPartition {
|
||||
/// Update sort key.
|
||||
pub async fn update_sort_key(
|
||||
self: &Arc<Self>,
|
||||
sort_key: SortKey,
|
||||
sort_key_ids: &ColumnSet,
|
||||
) -> Arc<Self> {
|
||||
pub async fn update_sort_key(self: &Arc<Self>, sort_key: SortKey) -> Arc<Self> {
|
||||
let old_sort_key = partition_lookup(
|
||||
self.catalog.catalog.repositories().await.as_mut(),
|
||||
&self.partition.transition_partition_id(),
|
||||
|
@ -469,7 +457,6 @@ impl TestPartition {
|
|||
&self.partition.transition_partition_id(),
|
||||
Some(old_sort_key),
|
||||
&sort_key.to_columns().collect::<Vec<_>>(),
|
||||
sort_key_ids,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -788,11 +775,6 @@ async fn update_catalog_sort_key_if_needed<R>(
|
|||
// Fetch the latest partition info from the catalog
|
||||
let partition = partition_lookup(repos, id).await.unwrap().unwrap();
|
||||
|
||||
// fecth column ids from catalog
|
||||
let columns = get_table_columns_by_id(partition.table_id, repos)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Similarly to what the ingester does, if there's an existing sort key in the catalog, add new
|
||||
// columns onto the end
|
||||
match partition.sort_key() {
|
||||
|
@ -806,9 +788,6 @@ async fn update_catalog_sort_key_if_needed<R>(
|
|||
catalog_sort_key.to_columns().collect::<Vec<_>>(),
|
||||
&new_columns,
|
||||
);
|
||||
|
||||
let column_ids = columns.ids_for_names(&new_columns);
|
||||
|
||||
repos
|
||||
.partitions()
|
||||
.cas_sort_key(
|
||||
|
@ -820,7 +799,6 @@ async fn update_catalog_sort_key_if_needed<R>(
|
|||
.collect::<Vec<_>>(),
|
||||
),
|
||||
&new_columns,
|
||||
&column_ids,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -829,10 +807,9 @@ async fn update_catalog_sort_key_if_needed<R>(
|
|||
None => {
|
||||
let new_columns = sort_key.to_columns().collect::<Vec<_>>();
|
||||
debug!("Updating sort key from None to {:?}", &new_columns);
|
||||
let column_ids = columns.ids_for_names(&new_columns);
|
||||
repos
|
||||
.partitions()
|
||||
.cas_sort_key(id, None, &new_columns, &column_ids)
|
||||
.cas_sort_key(id, None, &new_columns)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
|
|
@ -390,8 +390,8 @@ mod tests {
|
|||
};
|
||||
use async_trait::async_trait;
|
||||
use data_types::{
|
||||
partition_template::TablePartitionTemplateOverride, ColumnSet, ColumnType, PartitionId,
|
||||
PartitionKey, TableId,
|
||||
partition_template::TablePartitionTemplateOverride, ColumnType, PartitionId, PartitionKey,
|
||||
TableId,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use generated_types::influxdata::iox::partition_template::v1::{
|
||||
|
@ -410,7 +410,7 @@ mod tests {
|
|||
let c1 = t.create_column("tag", ColumnType::Tag).await;
|
||||
let c2 = t.create_column("time", ColumnType::Time).await;
|
||||
let p1 = t
|
||||
.create_partition_with_sort_key("k1", &["tag", "time"], &[c1.id(), c2.id()])
|
||||
.create_partition_with_sort_key("k1", &["tag", "time"])
|
||||
.await
|
||||
.partition
|
||||
.clone();
|
||||
|
@ -865,10 +865,10 @@ mod tests {
|
|||
|
||||
// set sort key
|
||||
let p = p
|
||||
.update_sort_key(
|
||||
SortKey::from_columns([c1.column.name.as_str(), c2.column.name.as_str()]),
|
||||
&ColumnSet::from([c1.column.id.get(), c2.column.id.get()]),
|
||||
)
|
||||
.update_sort_key(SortKey::from_columns([
|
||||
c1.column.name.as_str(),
|
||||
c2.column.name.as_str(),
|
||||
]))
|
||||
.await;
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_hash_id", 1);
|
||||
|
||||
|
@ -1110,12 +1110,11 @@ mod tests {
|
|||
partition_template: TablePartitionTemplateOverride::default(),
|
||||
});
|
||||
const N_PARTITIONS: usize = 20;
|
||||
let c_id = c.column.id.get();
|
||||
let mut partitions = futures::stream::iter(0..N_PARTITIONS)
|
||||
.then(|i| {
|
||||
let t = Arc::clone(&t);
|
||||
async move {
|
||||
t.create_partition_with_sort_key(&format!("p{i}"), &["time"], &[c_id])
|
||||
t.create_partition_with_sort_key(&format!("p{i}"), &["time"])
|
||||
.await
|
||||
.partition
|
||||
.transition_partition_id()
|
||||
|
|
|
@ -105,7 +105,7 @@ pub mod tests {
|
|||
use super::*;
|
||||
use arrow::{datatypes::DataType, record_batch::RecordBatch};
|
||||
use arrow_util::assert_batches_eq;
|
||||
use data_types::{ColumnSet, ColumnType, ParquetFile};
|
||||
use data_types::{ColumnType, ParquetFile};
|
||||
use datafusion_util::config::register_iox_object_store;
|
||||
use iox_query::{
|
||||
exec::{ExecutorType, IOxSessionContext},
|
||||
|
@ -186,20 +186,17 @@ pub mod tests {
|
|||
.join("\n");
|
||||
let ns = catalog.create_namespace_1hr_retention("ns").await;
|
||||
let table = ns.create_table("table").await;
|
||||
let tag1 = table.create_column("tag1", ColumnType::Tag).await;
|
||||
let tag2 = table.create_column("tag2", ColumnType::Tag).await;
|
||||
table.create_column("tag1", ColumnType::Tag).await;
|
||||
table.create_column("tag2", ColumnType::Tag).await;
|
||||
table.create_column("tag3", ColumnType::Tag).await;
|
||||
let tag4 = table.create_column("tag4", ColumnType::Tag).await;
|
||||
table.create_column("tag4", ColumnType::Tag).await;
|
||||
table.create_column("field_int", ColumnType::I64).await;
|
||||
table.create_column("field_float", ColumnType::F64).await;
|
||||
let col_time = table.create_column("time", ColumnType::Time).await;
|
||||
table.create_column("time", ColumnType::Time).await;
|
||||
let partition = table
|
||||
.create_partition("part")
|
||||
.await
|
||||
.update_sort_key(
|
||||
SortKey::from_columns(["tag1", "tag2", "tag4", "time"]),
|
||||
&ColumnSet::from([tag1.id(), tag2.id(), tag4.id(), col_time.id()]),
|
||||
)
|
||||
.update_sort_key(SortKey::from_columns(["tag1", "tag2", "tag4", "time"]))
|
||||
.await;
|
||||
let builder = TestParquetFileBuilder::default().with_line_protocol(&lp);
|
||||
let parquet_file = Arc::new(partition.create_parquet_file(builder).await.parquet_file);
|
||||
|
|
|
@ -237,7 +237,6 @@ fn to_partition(p: data_types::Partition) -> Partition {
|
|||
key: p.partition_key.to_string(),
|
||||
table_id: p.table_id.get(),
|
||||
array_sort_key: p.sort_key,
|
||||
array_sort_key_ids: p.sort_key_ids.iter().map(|id| id.get()).collect(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue