* chore: document and test split_percentage and percentage_max_file_size
* fix: Apply suggestions from code review
Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
* chore: add test with both max file size and split percentage
* docs: whitespace engineering and small typo
---------
Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
* refactor: n_threads and n_target_partitions are non-zero
Zero values will just panic. Prevent that earlier.
* fix: typo
Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
---------
Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
* feat: initial implementation of the split
* feat: split many L0 files in groups and compact them into new and fewer L0 files
* test: remove iappropriate AllAtOnce test
* refactor: move file classification for initial target to its own function
* fix: pop the branch from start to end
* chore: address review comments
* feat: support splitting to many L1 files
* feat: only add extra round to compact level-n files to same level-n files if their files plus overlapped level-n-plus-1 over limit
* chore: Apply suggestions from code review
Co-authored-by: Andrew Lamb <alamb@influxdata.com>
* chore: final cleanup and address comments
* chore: run fmt
---------
Co-authored-by: Andrew Lamb <alamb@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Fixes#6418.
Makes sure the querier, the router, and the ingest replica CLI all
accept and validate ingester addresses the same, except whether or not
at least one value is required.
* feat: `PartitionRepo::list_ids`
* refactor: `CatalogPartitionsSource` => `CatalogToCompactPartitionsSource`
* feat: allow the compactor to process all known partitions
Closes#6648.
* docs: improve
Co-authored-by: Andrew Lamb <alamb@influxdata.com>
---------
Co-authored-by: Andrew Lamb <alamb@influxdata.com>
Instead of looping and polling a fresh set of partitions and
constructing a stream from that, use an endless stream instead. This
helps w/ efficiency during roll-overs since we can already start to
process the next set of partitions while the last ones from the previous
round are still in-progress.
Closes#6750.
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* feat: partition filters for TargetLevel version and a complete test
* chore: Apply suggestions from code review
Co-authored-by: Andrew Lamb <alamb@influxdata.com>
* chore: run fmt after applying review suggestions in git
---------
Co-authored-by: Andrew Lamb <alamb@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* refactor: rename compact algo versions to reflect thier actual work
* chore: Apply suggestions from code review
Co-authored-by: Andrew Lamb <alamb@influxdata.com>
---------
Co-authored-by: Andrew Lamb <alamb@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
I'm not saying we have to use this, but this is a demonstration how easy
it would be to add sharding to the compaction tier and also acts as a
"backup / insurance" if we ever need it.
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Add some rough "partition is too big" filter for now until we can deal
with them (the framework allows that but we need to set up the proper
divide-and-conquer components).
This will hopefully prevent our prod compactor from dying that often.
Note that this is also duct-tape around two issues:
- DataFusion not accounting in-flight data all the time
- Our wide fan-out query plans (see https://github.com/influxdata/idpe/issues/16768#issuecomment-1387056833 )
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* fix: update clap parser for --ingester-addresses
* fix: make querier2 specify ingester addrs same as router2
* fix: update clap parser args but do not prepend http://
* chore: cargo fmt
* feat: introduce scratchpad store for compactor
Use an intermediate in-memory store (can be a disk later if we want) to
stage all inputs and outputs of the compaction. The reasons are:
- **fewer IO ops:** DataFusion's streaming IO requires slightly more
IO requests (at least 2 per file) due to the way it is optimized to
read as little as possible. It first reads the metadata and then
decides which content to fetch. In the compaction case this is (esp.
w/o delete predicates) EVERYTHING. So in contrast to the querier,
there is no advantage of this approach. In contrary this easily adds
100ms latency to every single input file.
- **less traffic:** For divide&conquer partitions (i.e. when we need to
run multiple compaction steps to deal with them) it is kinda pointless
to upload an intermediate result just to download it again. The
scratchpad avoids that.
- **higher throughput:** We want to limit the number of concurrent
DataFusion jobs because we don't wanna blow up the whole process by
having too much in-flight arrow data at the same time. However while
we perform the actual computation, we were waiting for object store
IO. This was limiting our throughput substantially.
- **shadow mode:** De-coupling the stores in this way makes it easier to
implement #6645.
Note that we assume here that the input parquet files are WAY SMALLER
than the uncompressed Arrow data during compaction itself.
Closes#6650.
* fix: panic on shutdown
* refactor: remove shadow scratchpad (for now)
* refactor: make scratchpad safe to use
It seems that prod was hanging last night. This is pretty hard to debug
and in general we should protect the compactor against hanging /
malformed partitions that take forever. This is similar to the fact that
the querier also has a timeout for every query. Let's see if this shows
anything in prod (and if not it's still a desired safety net).
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* chore: address review comment of previous PR
* refactor: execute compact plan
* refactor: we will now compact all L0 and L1 files of a partition and split them as needed
* chore: comnents
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
- use a single data structure for CLI args (not two)
- set mem limit default to 8GB (same as querier). We can always tune
this later, but we should not run with "unlimited" to begin with.
Sets up crate and wires up the main binary. No tests yet, no algorithm
framework, just the bare minimum.
Also I decided to not offer a gRPC server in `compactor2` at the moment
and hence did not implement any handle/delegate infrastructure. We add
this later if we need it. This also means compactor2 does NOT provide a
catalog service for now.
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* feat: cold
* chore: debug info
* feat: only compact qualified cold partition candidates
* fix: catalog test
* chore: cleanup
* chore: add new config flag for cold partition candidates
* chore: implement display for CompactionType and add tests for max num partitions
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* feat: Update replication.proto
* Remove the PartitionId in the replicate request as a single replicate request can have the data for many partitions.
* Add namespace_id and table_id to persist complete request to make data easier to lookup in buffer.
* feat: Initial ingest_replica skeleton
A bunch of copy pasta here from ingester2, but this takes out a ton of stuff that isn't used in replicas.
Also lays the groundwork for the simpler buffer structure to keep the data and a basic cache for catalog information that will be required.
* feat: update replication.proto GetPartitionBufferResponse
* chore: PR cleanup
* chore: PR cleanup
This PR uses the MutableBatch persist cost estimation added in #6425 to
selectively mark "hot" partitions for persistence.
This uses a (composable!) "post-write" observer that is invoked after
each buffer call - this allows the HotPartitionPersister in this commit
to inspect the cost of the partition after applying the write, and if it
exceeds the configurable cost threshold, enqueue it for persistence
(rotating the buffer within the partition in the process).
Unlike ingester(1), this implementation prevents overrun - the
application of the write that exceeds the cost limit, and enqueueing the
partition for persistence is atomic.
This commit changes the behaviour of the persist system to enable
optimal parallelism of persist operations, and improve the accuracy of
the outstanding job bound / back-pressure.
Previously all persist operations for a given partition were
consistently hashed to a single worker task. This serialised persistence
per partition, ensuring all updates to the partition sort key were
serialised. However, this also unnecessarily serialises persist
operations that do not need to update the sort key, reducing the
potential throughput of the system; in the worst case of a single
partition receiving all the writes, only one worker would be persisting,
and the other N-1 workers would be idle.
After this change, the sort key is inspected when enqueuing the persist
operation and if it can be determined that no sort key update is
necessary (the typical case), then the persist task is placed into a
global work queue from which all workers consume. This allows for
maximal parallelisation of these jobs, and the removes the per-worker
head-of-line blocking.
In the case that the sort key does need updating, these jobs continue to
be consistently hashed to a single worker, ensuring serialised sort key
updates only where necessary.
To support these changes, the back-pressure system has been changed to
account for all outstanding persist jobs in the system, regardless of
type or assigned worker - a logical, bounded queue is composed together
of a semaphore limiting the number of persist tasks overall, and a
series of physical, unbounded queues - one to each worker & the global
queue. The overall system remains bounded by the
INFLUXDB_IOX_PERSIST_QUEUE_DEPTH value, and is now simpler to reason
about (it is independent of the number of workers, etc).
Removes the submission queue from the persist fan-out, instead the
PersistHandle now carries the shared state internally (cheaply cloned
via ref counts).
This also resolves the persist deadlock when under load.
* refactor: DF-driven on-demand mem limit instead of ahead-of-time heuristics
Closes#6310.
* refactor: rename and tune default exec mem limits
* fix: ingester2 bits after rebase
* chore: remove unused/moved ns_autocreation dml handler
* feat(router): expose new ns retention as config
* fix: forgot to set default value for router retention arg
* chore: make new namespace retention param an option
* feat: compactor ignores max file count for first file
chore: typo in comment in compactor
* feat: restore special first file in partition compaction logic; add limit
* fix: calculation in compaction max file count
chore: clippy
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* feat: config param to set when partition is cold
* chore: Apply suggestions from code review
Co-authored-by: Andrew Lamb <alamb@influxdata.com>
* fix: make default 8 hours and avoid using 8 * 60 becasue it is a string, not expression which makes a test fail
Co-authored-by: Andrew Lamb <alamb@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* revert: "revert: rdkafka/rskafka swapping (#5800)"
This reverts commit b77c3540e1.
* test: Verify write buffer connection_config is parsed as expected
* test: Failing test reproducing the error seen when deploying rdkafka
* fix: Translate k8s-idpe configs to rdkafka configs