Merge pull request #4206 from influxdata/cn/sort-key-catalog

feat: Add optional sort_key column to partition table in the catalog
pull/24376/head
kodiakhq[bot] 2022-04-04 17:02:56 +00:00 committed by GitHub
commit f1799d836f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 109 additions and 10 deletions

View File

@ -652,6 +652,9 @@ pub struct Partition {
pub table_id: TableId, pub table_id: TableId,
/// the string key of the partition /// the string key of the partition
pub partition_key: String, pub partition_key: String,
/// The sort key for the partition. Should be computed on the first persist operation for
/// this partition and updated if new tag columns are added.
pub sort_key: Option<String>,
} }
/// Information for a partition from the catalog. /// Information for a partition from the catalog.

View File

@ -0,0 +1,4 @@
ALTER TABLE
IF EXISTS partition
ADD
COLUMN sort_key VARCHAR;

View File

@ -45,6 +45,9 @@ pub enum Error {
#[snafu(display("table {} not found", id))] #[snafu(display("table {} not found", id))]
TableNotFound { id: TableId }, TableNotFound { id: TableId },
#[snafu(display("partition {} not found", id))]
PartitionNotFound { id: PartitionId },
#[snafu(display( #[snafu(display(
"couldn't create column {} in table {}; limit reached on namespace", "couldn't create column {} in table {}; limit reached on namespace",
column_name, column_name,
@ -403,8 +406,8 @@ pub trait SequencerRepo: Send + Sync {
) -> Result<()>; ) -> Result<()>;
} }
/// Functions for working with IOx partitions in the catalog. Note that these are how /// Functions for working with IOx partitions in the catalog. Note that these are how IOx splits up
/// IOx splits up data within a database, which is differenet than Kafka partitions. /// data within a database, which is differenet than Kafka partitions.
#[async_trait] #[async_trait]
pub trait PartitionRepo: Send + Sync { pub trait PartitionRepo: Send + Sync {
/// create or get a partition record for the given partition key, sequencer and table /// create or get a partition record for the given partition key, sequencer and table
@ -424,11 +427,19 @@ pub trait PartitionRepo: Send + Sync {
/// return partitions for a given namespace /// return partitions for a given namespace
async fn list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result<Vec<Partition>>; async fn list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result<Vec<Partition>>;
/// return the partition record, the namespace name it belongs to, and the table name it is under /// return the partition record, the namespace name it belongs to, and the table name it is
/// under
async fn partition_info_by_id( async fn partition_info_by_id(
&mut self, &mut self,
partition_id: PartitionId, partition_id: PartitionId,
) -> Result<Option<PartitionInfo>>; ) -> Result<Option<PartitionInfo>>;
/// Update the sort key for the partition
async fn update_sort_key(
&mut self,
partition_id: PartitionId,
sort_key: &str,
) -> Result<Partition>;
} }
/// Functions for working with tombstones in the catalog /// Functions for working with tombstones in the catalog
@ -1316,9 +1327,50 @@ pub(crate) mod test_helpers {
let expected: BTreeMap<_, _> = created let expected: BTreeMap<_, _> = created
.iter() .iter()
.map(|(k, v)| (*k, v.clone())) .map(|(k, v)| (*k, v.clone()))
.chain(std::iter::once((other_partition.id, other_partition))) .chain(std::iter::once((
other_partition.id,
other_partition.clone(),
)))
.collect(); .collect();
assert_eq!(expected, listed); assert_eq!(expected, listed);
// sort_key should be None on creation
assert_eq!(other_partition.sort_key, None);
// test update_sort_key from None to Some
repos
.partitions()
.update_sort_key(other_partition.id, "tag2,tag1,time")
.await
.unwrap();
// test getting the new sort key
let updated_other_partition = repos
.partitions()
.get_by_id(other_partition.id)
.await
.unwrap()
.unwrap();
assert_eq!(updated_other_partition.sort_key.unwrap(), "tag2,tag1,time");
// test update_sort_key from Some value to Some other value
repos
.partitions()
.update_sort_key(other_partition.id, "tag2,tag1,tag3,time")
.await
.unwrap();
// test getting the new sort key
let updated_other_partition = repos
.partitions()
.get_by_id(other_partition.id)
.await
.unwrap()
.unwrap();
assert_eq!(
updated_other_partition.sort_key.unwrap(),
"tag2,tag1,tag3,time"
);
} }
async fn test_tombstone(catalog: Arc<dyn Catalog>) { async fn test_tombstone(catalog: Arc<dyn Catalog>) {

View File

@ -692,6 +692,7 @@ impl PartitionRepo for MemTxn {
sequencer_id, sequencer_id,
table_id, table_id,
partition_key: key.to_string(), partition_key: key.to_string(),
sort_key: None,
}; };
stage.partitions.push(p); stage.partitions.push(p);
stage.partitions.last().unwrap() stage.partitions.last().unwrap()
@ -776,6 +777,21 @@ impl PartitionRepo for MemTxn {
Ok(None) Ok(None)
} }
async fn update_sort_key(
&mut self,
partition_id: PartitionId,
sort_key: &str,
) -> Result<Partition> {
let stage = self.stage();
match stage.partitions.iter_mut().find(|p| p.id == partition_id) {
Some(p) => {
p.sort_key = Some(sort_key.to_string());
Ok(p.clone())
}
None => Err(Error::PartitionNotFound { id: partition_id }),
}
}
} }
#[async_trait] #[async_trait]

View File

@ -244,6 +244,7 @@ decorate!(
"partition_list_by_sequencer" = list_by_sequencer(&mut self, sequencer_id: SequencerId) -> Result<Vec<Partition>>; "partition_list_by_sequencer" = list_by_sequencer(&mut self, sequencer_id: SequencerId) -> Result<Vec<Partition>>;
"partition_list_by_namespace" = list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result<Vec<Partition>>; "partition_list_by_namespace" = list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result<Vec<Partition>>;
"partition_partition_info_by_id" = partition_info_by_id(&mut self, partition_id: PartitionId) -> Result<Option<PartitionInfo>>; "partition_partition_info_by_id" = partition_info_by_id(&mut self, partition_id: PartitionId) -> Result<Option<PartitionInfo>>;
"partition_update_sort_key" = update_sort_key(&mut self, partition_id: PartitionId, sort_key: &str) -> Result<Partition>;
] ]
); );

View File

@ -1157,10 +1157,7 @@ RETURNING *;
async fn list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result<Vec<Partition>> { async fn list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result<Vec<Partition>> {
sqlx::query_as::<_, Partition>( sqlx::query_as::<_, Partition>(
r#" r#"
SELECT partition.id as id, SELECT partition.*
partition.sequencer_id as sequencer_id,
partition.table_id as table_id,
partition.partition_key as partition_key
FROM table_name FROM table_name
INNER JOIN partition on partition.table_id = table_name.id INNER JOIN partition on partition.table_id = table_name.id
WHERE table_name.namespace_id = $1; WHERE table_name.namespace_id = $1;
@ -1178,8 +1175,7 @@ WHERE table_name.namespace_id = $1;
) -> Result<Option<PartitionInfo>> { ) -> Result<Option<PartitionInfo>> {
let info = sqlx::query( let info = sqlx::query(
r#" r#"
SELECT namespace.name as namespace_name, table_name.name as table_name, partition.id, SELECT namespace.name as namespace_name, table_name.name as table_name, partition.*
partition.sequencer_id, partition.table_id, partition.partition_key
FROM partition FROM partition
INNER JOIN table_name on table_name.id = partition.table_id INNER JOIN table_name on table_name.id = partition.table_id
INNER JOIN namespace on namespace.id = table_name.namespace_id INNER JOIN namespace on namespace.id = table_name.namespace_id
@ -1198,6 +1194,7 @@ WHERE partition.id = $1;
sequencer_id: info.get("sequencer_id"), sequencer_id: info.get("sequencer_id"),
table_id: info.get("table_id"), table_id: info.get("table_id"),
partition_key: info.get("partition_key"), partition_key: info.get("partition_key"),
sort_key: info.get("sort_key"),
}; };
Ok(Some(PartitionInfo { Ok(Some(PartitionInfo {
@ -1206,6 +1203,32 @@ WHERE partition.id = $1;
partition, partition,
})) }))
} }
async fn update_sort_key(
&mut self,
partition_id: PartitionId,
sort_key: &str,
) -> Result<Partition> {
let rec = sqlx::query_as::<_, Partition>(
r#"
UPDATE partition
SET sort_key = $1
WHERE id = $2
RETURNING *;
"#,
)
.bind(&sort_key)
.bind(&partition_id)
.fetch_one(&mut self.inner)
.await;
let partition = rec.map_err(|e| match e {
sqlx::Error::RowNotFound => Error::PartitionNotFound { id: partition_id },
_ => Error::SqlxError { source: e },
})?;
Ok(partition)
}
} }
#[async_trait] #[async_trait]