Connects to #6421.
Even if the ingester doesn't have data in memory for a query, we need to
send back metadata about the ingester UUID and the number of files
persisted so that the querier can decide whether it needs to refresh the
cache.
This PR uses the MutableBatch persist cost estimation added in #6425 to
selectively mark "hot" partitions for persistence.
This uses a (composable!) "post-write" observer that is invoked after
each buffer call - this allows the HotPartitionPersister in this commit
to inspect the cost of the partition after applying the write, and if it
exceeds the configurable cost threshold, enqueue it for persistence
(rotating the buffer within the partition in the process).
Unlike ingester(1), this implementation prevents overrun - the
application of the write that exceeds the cost limit, and enqueueing the
partition for persistence is atomic.
This commit changes the behaviour of the persist system to enable
optimal parallelism of persist operations, and improve the accuracy of
the outstanding job bound / back-pressure.
Previously all persist operations for a given partition were
consistently hashed to a single worker task. This serialised persistence
per partition, ensuring all updates to the partition sort key were
serialised. However, this also unnecessarily serialises persist
operations that do not need to update the sort key, reducing the
potential throughput of the system; in the worst case of a single
partition receiving all the writes, only one worker would be persisting,
and the other N-1 workers would be idle.
After this change, the sort key is inspected when enqueuing the persist
operation and if it can be determined that no sort key update is
necessary (the typical case), then the persist task is placed into a
global work queue from which all workers consume. This allows for
maximal parallelisation of these jobs, and the removes the per-worker
head-of-line blocking.
In the case that the sort key does need updating, these jobs continue to
be consistently hashed to a single worker, ensuring serialised sort key
updates only where necessary.
To support these changes, the back-pressure system has been changed to
account for all outstanding persist jobs in the system, regardless of
type or assigned worker - a logical, bounded queue is composed together
of a semaphore limiting the number of persist tasks overall, and a
series of physical, unbounded queues - one to each worker & the global
queue. The overall system remains bounded by the
INFLUXDB_IOX_PERSIST_QUEUE_DEPTH value, and is now simpler to reason
about (it is independent of the number of workers, etc).
Instead of recording the "enqueued_at" when initialising the
PersistRequest, inject the value in.
This lets us re-order the request construction while retaining accurate
timing.
Previously data within a partition had to be persisted in the order in
which the data was received. This was necessary for the correctness of
the query API, as it utilised the lower-bound sequence number to
determine what data was available in the object store.
With the changes to the parquet discovery protocol / query API made in
https://github.com/influxdata/influxdb_iox/pull/6365 this restriction
can be lifted, allowing out-of-order persistence within a partition for
increased parallelism / performance.
This commit changes the PartitionData to accept out-of-order persist
completion notifications, removing the ordering invariant from ingester2
(note that the persist ops currently remain ordered however).
Adds a peek() method to the DeferredLoad construct, allowing a caller to
immediately read the resolved value, or "None" if the value is
unresolved or concurrently resolving.
This allows a caller to optimistically read the value without having to
block and wait for it to become available.
This commit causes an ingester2 instance to stop accepting new writes
when at least one persist queue is full. Writes continue to be rejected
until the persist workers have processed enough outstanding persist
tasks to drain the queues to half of their capacity, at which point
writes are accepted again.
When a write is rejected, the ingester returns a "resource exhausted"
RPC code to the caller.
Checking if the system is in a healthy state for writes is extremely
cheap, as it is on the hot path for all writes.
* feat: optimize wal with batching
Simplified the wal writer so that it batches up write operations. Currently it waits 10ms between fsync calls. We can pull this out to a config variable later if we want, but I think this is good enough for now.
Also updated the reader to be a more simple blocking reader without the extra tasks and channels as that wasn't really getting us anything that I know of.
* chore: cleanup wal code for PR feedback
Removes the submission queue from the persist fan-out, instead the
PersistHandle now carries the shared state internally (cheaply cloned
via ref counts).
This also resolves the persist deadlock when under load.
Adds more debug logging to the persist code paths, as well as capturing
& logging (at INFO) timing information tracking the time a persist task
spends in the queue, the active time spent actually persisting the data,
and the total duration of time since the request was created (sum of
both durations).
This adds a simple WAL replay benchmark to ingester2 that executes a
replay of a single line of LP.
Unfortunately each file in the benches directory is compiled as it's own
binary/crate, and as such is restricted to importing only "pub" types.
This sucks, as it requires you to either benchmark at a high level
(macro, not microbenchmarks - i.e. benchmarking the ingester startup,
not just the WAL replay) or you are forced to mark the reliant types &
functions as "pub", as well as all the other types/traits they reference
in their signatures. Because the performance sensitive code is usually
towards the lower end of the call stack, this can quickly lead to an
explosion of "pub" types causing a large amount of internal code to be
exported.
Instead this commit uses a middle-ground; benchmarked types & fns are
conditionally marked as "pub" iff the "benches" feature is enabled. This
prevents them from being visible by default, but allows the benchmark
function to call them.
The benchmark itself is also restricted to only run when this feature is
enabled.
* refactor: DF-driven on-demand mem limit instead of ahead-of-time heuristics
Closes#6310.
* refactor: rename and tune default exec mem limits
* fix: ingester2 bits after rebase
The ingester will replay the ops, and then immediately trigger a
persist! This comment is done :)
Outstanding is actually deleting the replayed files, which will come
with the ref-counted WAL segment change later.
Changes the WAL rotation task to drop the WAL segment after the
partition data has completely persisted.
This logic contains a race condition outlined (at length) in the code
comments, along with a plan to resolve it once I'm back from holiday.
For now, the extremely low likelihood and minor impact of the race
occurring is likely acceptable for testing purposes.
Changes the WAL rotation code to cause the current set of partitions to
be submitted for persistence.
Because rotating the WAL segment and triggering persistence is not
atomic (not under an exclusive lock preventing writes) the persisted
buffers MAY contain writes that appear in the new segment file.
In the happy/non-crash path, this will have no effect, however if the
ingester crashes and replays the WAL files, these writes will be
duplicated into object storage (where compaction will resolve the issue)
- this seems like a good trade-off, allowing us to avoid blocking write
requests for as long as it takes to rotate the WAL and mark all
partitions as persisting.
(though currently WAL files are not dropped and thus everything is
replayed all the time.)
Implements actor-based, parallel persistence in ingester2 with
controllable fan-out parallelism and queue depths.
This implementation encapsulates the complexity of persistence, queuing
and parallelism - the caller simply uses the handle to persist a
partition, while the actor handles fan-out to a set of persistence
workers, compaction in a separate thread-pool, and optional completion
notifications.
By consistently hashing persist jobs onto workers, parallelism is
achieved across partitions, but serialisation of partition persists is
enforced so that the sort key update is correctly serialised.
This commit swaps the existing single "persisting batch" slot (field) in
a PartitionData for an ordered queue of outstanding partitions.
This decouples marking a partition buffer for persistence from the
actual persistence operation, allowing them to proceed at different
rates. This reduces the complexity of persistence management, but also
allows us to gracefully handle "hot" partitions; for example, this
problematic scenario in the ingester(1) implementation during recovery:
* Writes come into a partition, reaching a size/row/hotness limit
* Partition is enqueued for persistence
* More writes come into the new buffer, exceeding the same limit
* Cannot persist the hot buffer because of outstanding persist op
Without this change the only possibilities in this situation are:
* stop ingest for the partition and error all writes that
(partially!) write to the partition, or
* continue accepting writes, allowing the partition to exceed the
limit that marked it for persistence in the first place
The latter is what the ingester(1) implementation does today, which
results in partitions exceeding their row/size/age limits, which exist
to limit the cost of generating the Parquet file from the buffer - this
is a significant contributor to instability during recovery.
This strategy enforces configured the limits on the partition buffer,
but does not block / slow down recovery while persistence is completed.
Pushes the (ref-counted/shared) deferred NamespaceName resolver into the
child TableData and PartitionData nodes in the BufferTree.
This allows the PartitionData to provide the name of the namespace to
which it belongs, in addition to the existing table name, and all
relevant IDs.
Replay the WAL log, if any, at startup.
Op replay is performed synchronously, during initialisation of the
ingester2 instance, and passes all ops through the "normal" write path
that the system uses once replay is complete, minus the WAL writer layer
- this helps to DRY the write path and minimise different behaviours.
Now partition cache entries are smaller, the number of entries held in
memory can be increased - this now uses ~2MiB of memory and drains the
cache during execution, amortising to 0.