Merge pull request #7049 from influxdata/cn/delete-experiments

fix: Small remaining cleanups from the big deletion PR
pull/24376/head
Dom 2023-05-09 10:23:55 +01:00 committed by GitHub
commit e60babfc06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 23 additions and 150 deletions

View File

@ -225,14 +225,6 @@ jobs:
# When removing this, also remove the ignore on the test in trogging/src/cli.rs
RUST_LOG: debug,,hyper::proto::h1=info,h2=info
LOG_FILTER: debug,,hyper::proto::h1=info,h2=info
# TEMPORARY: Can be removed when the ingester that uses the write buffer is removed. Tests
# need to spin up separate servers because I've only been able to implement a "persist
# everything" API, and if tests run in parallel using a shared server, they interfere with
# each other. Starting separate servers with the maximum number of Rust test threads uses up
# all the Postgres connections in CI, so limit the parallelization until we switch completely
# to ingester2, which does have a "persist-per-namespace" API that means tests can run on
# shared MiniClusters.
RUST_TEST_THREADS: 8
# Run the JDBC tests
TEST_INFLUXDB_JDBC: "true"
steps:

5
Cargo.lock generated
View File

@ -2443,8 +2443,6 @@ dependencies = [
"client_util",
"data_types",
"futures",
"generated_types",
"influxdb_iox_client",
"iox_catalog",
"metric",
"object_store",
@ -2455,7 +2453,6 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"workspace-hack",
]
@ -2597,7 +2594,6 @@ dependencies = [
"schema",
"serde",
"serde_json",
"sharder",
"snafu",
"tempfile",
"test_helpers",
@ -2975,7 +2971,6 @@ dependencies = [
"parquet_file",
"predicate",
"schema",
"sharder",
"uuid",
"workspace-hack",
]

View File

@ -19,7 +19,6 @@ observability_deps = { path = "../observability_deps" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.96"
snafu = "0.7"
tempfile = "3.5.0"
trace = { path = "../trace" }
trace_exporters = { path = "../trace_exporters" }
trogging = { path = "../trogging", default-features = false, features = ["clap"] }
@ -27,6 +26,7 @@ uuid = { version = "1", features = ["v4"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]
tempfile = "3.5.0"
test_helpers = { path = "../test_helpers" }
[features]

View File

@ -726,8 +726,8 @@ impl sqlx::Decode<'_, sqlx::Sqlite> for PartitionKey {
}
}
/// Data object for a partition. The combination of shard, table and key are unique (i.e. only
/// one record can exist for each combo)
/// Data object for a partition. The combination of table and key are unique (i.e. only one record
/// can exist for each combo)
#[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)]
pub struct Partition {
/// the id of the partition
@ -1849,10 +1849,9 @@ pub const MIN_NANO_TIME: i64 = i64::MIN + 2;
///
/// 2262-04-11 23:47:16.854775806 +0000 UTC
///
/// The highest time represented by a nanosecond needs to be used for an
/// exclusive range in the shard group, so the maximum time needs to be one
/// less than the possible maximum number of nanoseconds representable by an
/// int64 so that we don't lose a point at that one time.
/// The highest time represented by a nanosecond needs to be used for an exclusive range, so the
/// maximum time needs to be one less than the possible maximum number of nanoseconds representable
/// by an int64 so that we don't lose a point at that one time.
/// Source: [influxdb](https://github.com/influxdata/influxdb/blob/540bb66e1381a48a6d1ede4fc3e49c75a7d9f4af/models/time.go#L12-L34)
pub const MAX_NANO_TIME: i64 = i64::MAX - 1;

View File

@ -1,6 +1,6 @@
# Job of a Compactor
Compactor is one of the servers in an IOx cluster and its main job is to compact many small and time-overlapped files into larger and non-time-overlapped files. Duplicated and soft deleted data will also be removed during compaction. There may be one or many Compactors in a cluster, each will be responsible for compacting files of a set of specified shards.
Compactor is one of the servers in an IOx cluster and its main job is to compact many small and time-overlapped files into larger and non-time-overlapped files. Duplicated and soft deleted data will also be removed during compaction. There may be one or many Compactors in a cluster.
- The purpose of compaction to increase query performance by
1. Avoiding reading too many small files
@ -46,8 +46,7 @@ If increasing memory a lot does not help, consider changing one or a combination
These are [up-to-date configurable parameters](https://github.com/influxdata/influxdb_iox/blob/main/clap_blocks/src/compactor.rs). Here are a few key parameters you may want to tune for your needs:
- **Size of the files:** The compactor cannot control the sizes of level-0 files but they are usually small and can be adjusted by config params of the Ingesters. The compactor decides the max desired size of level-1 and level-2 files which is around `INFLUXDB_IOX_COMPACTION_MAX_DESIRED_FILE_SIZE_BYTES * (100 + INFLUXDB_IOX_COMPACTION_PERCENTAGE_MAX_FILE_SIZE) / 100`.
- **Map a compactor to several shards:** Depending on your Ingester setup, there may be several shards. A compactor can be set up to compact all or a fraction of the shards. Use range `[INFLUXDB_IOX_SHARD_INDEX_RANGE_START, INFLUXDB_IOX_SHARD_INDEX_RANGE_END]` to map them.
- **Number of partitions considered to compact per shard:** If there is enough memory, which is usually the case, the compactor will compact many partitions of the same or different shards concurrently. Depending on how many shards a compactor handles and how much memory that compactor is configured to use, you can increase/reduce the concurrent compaction level by increasing/reducing the number of partitions per shard by adjusting `INFLUXDB_IOX_COMPACTION_MAX_NUMBER_PARTITIONS_PER_SHARD`.
- **Number of partitions considered to compact:** If there is enough memory, which is usually the case, the compactor will compact many partitions concurrently. Depending on how much memory that compactor is configured to use, you can increase/reduce the concurrent compaction level by increasing/reducing the number of partitions.
- **Concurrency capacity:** to configure this based on your available memory, you need to understand how IOx estimates memory to compact files in the next section.
# Memory Estimation
@ -196,15 +195,15 @@ SELECT * FROM skipped_compactions;
-- remove partitions from the skipped_compactions
DELETE FROM skipped_compactions WHERE partition_id in ([your_ids]);
-- Content of skipped_compactions with their shard index, partition key and table id
SELECT shard_index, table_id, partition_id, partition_key, left(reason, 25),
-- Content of skipped_compactions with their partition key and table id
SELECT table_id, partition_id, partition_key, left(reason, 25),
num_files, limit_num_files, estimated_bytes, limit_bytes, to_timestamp(skipped_at) skipped_at
FROM skipped_compactions, partition, shard
WHERE partition.id = skipped_compactions.partition_id and partition.shard_id = shard.id
ORDER BY shard_index, table_id, partition_key, skipped_at;
FROM skipped_compactions, partition
WHERE partition.id = skipped_compactions.partition_id
ORDER BY table_id, partition_key, skipped_at;
-- Number of files per level for top 50 partitions with most files of a specified day
SELECT s.shard_index, pf.table_id, pf.partition_id, p.partition_key,
SELECT pf.table_id, pf.partition_id, p.partition_key,
count(case when pf.to_delete is null then 1 end) total_not_deleted,
count(case when pf.compaction_level=0 and pf.to_delete is null then 1 end) num_l0,
count(case when pf.compaction_level=1 and pf.to_delete is null then 1 end) num_l1,
@ -212,10 +211,10 @@ SELECT s.shard_index, pf.table_id, pf.partition_id, p.partition_key,
count(case when pf.compaction_level=0 and pf.to_delete is not null then 1 end) deleted_num_l0,
count(case when pf.compaction_level=1 and pf.to_delete is not null then 1 end) deleted_num_l1,
count(case when pf.compaction_level=2 and pf.to_delete is not null then 1 end) deleted_num_l2
FROM parquet_file pf, partition p, shard s
WHERE pf.partition_id = p.id AND pf.shard_id = s.id
FROM parquet_file pf, partition p
WHERE pf.partition_id = p.id
AND p.partition_key = '2022-10-11'
GROUP BY s.shard_index, pf.table_id, pf.partition_id, p.partition_key
GROUP BY pf.table_id, pf.partition_id, p.partition_key
ORDER BY count(case when pf.to_delete is null then 1 end) DESC
LIMIT 50;

View File

@ -30,7 +30,6 @@ fn main() -> Result<()> {
/// - `influxdata.iox.schema.v1.rs`
/// - `influxdata.iox.wal.v1.rs`
/// - `influxdata.iox.write.v1.rs`
/// - `influxdata.iox.write_buffer.v1.rs`
/// - `influxdata.platform.storage.rs`
fn generate_grpc_types(root: &Path) -> Result<()> {
let authz_path = root.join("influxdata/iox/authz/v1");
@ -44,7 +43,6 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
let querier_path = root.join("influxdata/iox/querier/v1");
let schema_path = root.join("influxdata/iox/schema/v1");
let wal_path = root.join("influxdata/iox/wal/v1");
let write_buffer_path = root.join("influxdata/iox/write_buffer/v1");
let storage_path = root.join("influxdata/platform/storage");
let storage_errors_path = root.join("influxdata/platform/errors");
@ -70,7 +68,6 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
root.join("influxdata/pbdata/v1/influxdb_pb_data_protocol.proto"),
schema_path.join("service.proto"),
wal_path.join("wal.proto"),
write_buffer_path.join("write_buffer.proto"),
storage_path.join("predicate.proto"),
storage_path.join("service.proto"),
storage_path.join("source.proto"),

View File

@ -1,63 +0,0 @@
syntax = "proto3";
package influxdata.iox.write_buffer.v1;
option go_package = "github.com/influxdata/iox/write_buffer/v1";
import "influxdata/iox/delete/v1/service.proto";
import "influxdata/pbdata/v1/influxdb_pb_data_protocol.proto";
// Configures the use of a write buffer.
message WriteBufferConnection {
reserved 1;
reserved "direction";
// Which type should be used (e.g. "kafka", "mock")
string type = 2;
// Connection string, depends on `type`.
string connection = 3;
// Old non-nested auto-creation config.
reserved 4, 5, 7;
// Special configs to be applied when establishing the connection.
//
// This depends on `type` and can configure aspects like timeouts.
map<string, string> connection_config = 6;
// Specifies if the shards (e.g. for Kafka in form of a topic w/ `n_shards` partitions) should be
// automatically created if they do not existing prior to reading or writing.
WriteBufferCreationConfig creation_config = 8;
}
// Configs shard auto-creation for write buffers.
//
// What that means depends on the used write buffer, e.g. for Kafka this will create a new topic w/
// `n_shards` partitions.
message WriteBufferCreationConfig {
// Renamed from n_sequencers to n_shards.
reserved 1;
reserved "n_sequencers";
// Number of shards.
//
// How they are implemented depends on `type`, e.g. for Kafka this is mapped to the number of
// partitions.
//
// If 0, a server-side default is used
uint32 n_shards = 3;
// Special configs to by applied when shards are created.
//
// This depends on `type` and can setup parameters like retention policy.
//
// Contains 0 or more key value pairs
map<string, string> options = 2;
}
// A write payload for the write buffer
message WriteBufferPayload {
oneof payload {
influxdata.pbdata.v1.DatabaseBatch write = 1;
influxdata.iox.delete.v1.DeletePayload delete = 2;
}
}

View File

@ -38,8 +38,7 @@ pub struct IngesterQueryRequest {
}
impl IngesterQueryRequest {
/// Make a request to return data for a specified table for
/// all shards an ingester is responsible for
/// Make a request to return data for a specified table
pub fn new(
namespace_id: NamespaceId,
table_id: TableId,

View File

@ -173,19 +173,6 @@ pub mod influxdata {
include!(concat!(env!("OUT_DIR"), "/influxdata.iox.wal.v1.serde.rs"));
}
}
pub mod write_buffer {
pub mod v1 {
include!(concat!(
env!("OUT_DIR"),
"/influxdata.iox.write_buffer.v1.rs"
));
include!(concat!(
env!("OUT_DIR"),
"/influxdata.iox.write_buffer.v1.serde.rs"
));
}
}
}
pub mod pbdata {

View File

@ -9,8 +9,6 @@ license.workspace = true
chrono = { version = "0.4", default-features = false }
data_types = { path = "../data_types" }
futures = "0.3"
generated_types = { path = "../generated_types" }
influxdb_iox_client = { path = "../influxdb_iox_client" }
iox_catalog = { path = "../iox_catalog" }
object_store = { version = "0.5.6", features = ["aws"] }
schema = { path = "../schema" }
@ -18,7 +16,6 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.96"
thiserror = "1.0.40"
tokio = { version = "1.28" }
tonic = { workspace = true }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]

View File

@ -37,7 +37,6 @@ parquet_to_line_protocol = { path = "../parquet_to_line_protocol" }
prost = { version = "0.11" }
iox_query = { path = "../iox_query" }
schema = { path = "../schema" }
sharder = { path = "../sharder" }
iox_time = { path = "../iox_time" }
trace_exporters = { path = "../trace_exporters" }
trogging = { path = "../trogging", default-features = false, features = ["clap"] }

View File

@ -83,7 +83,6 @@ async fn parquet_to_lp() {
// Looks like:
// {
// "id": "1",
// "shardId": 1,
// "namespaceId": 1,
// "tableId": 1,
// "partitionId": "1",

View File

@ -2,19 +2,15 @@
## Quick run
Set-up empty catalog db:
Set up empty catalog db:
```bash
mkdir -p /tmp/iox/{wal,obj}
createdb iox_shared
./target/debug/influxdb_iox catalog setup --catalog-dsn postgres:///iox_shared
# there has to exist one "topic", see https://github.com/influxdata/influxdb_iox/issues/6420
psql 'dbname=iox_shared options=-csearch_path=public,iox_catalog' -c "insert into topic (name) values ('iox-shared')"
```
Run ingester:
```bash

View File

@ -404,11 +404,11 @@ pub trait ColumnRepo: Send + Sync {
async fn list(&mut self) -> Result<Vec<Column>>;
}
/// Functions for working with IOx partitions in the catalog. Note that these are how IOx splits up
/// data within a namespace, which is different than Kafka partitions.
/// Functions for working with IOx partitions in the catalog. These are how IOx splits up
/// data within a namespace.
#[async_trait]
pub trait PartitionRepo: Send + Sync {
/// create or get a partition record for the given partition key, shard and table
/// create or get a partition record for the given partition key and table
async fn create_or_get(&mut self, key: PartitionKey, table_id: TableId) -> Result<Partition>;
/// get partition by ID

View File

@ -7,7 +7,6 @@ use crate::{
Error, NamespaceRepo, ParquetFileRepo, PartitionRepo, RepoCollection, Result,
SoftDeletedRows, TableRepo, Transaction, MAX_PARQUET_FILES_SELECTED_ONCE,
},
kafkaless_transition::{Shard, SHARED_TOPIC_ID, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX},
metrics::MetricDecorator,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES,
};
@ -66,7 +65,6 @@ struct MemCollections {
namespaces: Vec<Namespace>,
tables: Vec<Table>,
columns: Vec<Column>,
shards: Vec<Shard>,
partitions: Vec<Partition>,
skipped_compactions: Vec<SkippedCompaction>,
parquet_files: Vec<ParquetFile>,
@ -121,26 +119,6 @@ impl Display for MemCatalog {
#[async_trait]
impl Catalog for MemCatalog {
async fn setup(&self) -> Result<(), Error> {
let guard = Arc::clone(&self.collections).lock_owned().await;
let stage = guard.clone();
let mut transaction = MemTxn {
inner: MemTxnInner::Txn {
guard,
stage,
finalized: false,
},
time_provider: self.time_provider(),
};
let stage = transaction.stage();
// The transition shard must exist and must have magic ID and INDEX.
let shard = Shard {
id: TRANSITION_SHARD_ID,
topic_id: SHARED_TOPIC_ID,
shard_index: TRANSITION_SHARD_INDEX,
};
stage.shards.push(shard);
transaction.commit_inplace().await?;
Ok(())
}

View File

@ -22,7 +22,6 @@ parquet_file = { path = "../parquet_file" }
predicate = { path = "../predicate" }
iox_query = { path = "../iox_query" }
schema = { path = "../schema" }
sharder = { path = "../sharder" }
uuid = { version = "1", features = ["v4"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
futures = "0.3.28"

View File

@ -31,7 +31,7 @@ const METRIC_EVAL_INTERVAL: Duration = Duration::from_secs(3);
///
/// # Request Distribution
///
/// Requests are distributed uniformly across all shards **per thread**. Given
/// Requests are distributed uniformly across all endpoints **per thread**. Given
/// enough requests (where `N` is significantly larger than the number of
/// threads) an approximately uniform distribution is achieved.
#[derive(Debug)]