Removes the submission queue from the persist fan-out, instead the
PersistHandle now carries the shared state internally (cheaply cloned
via ref counts).
This also resolves the persist deadlock when under load.
Adds more debug logging to the persist code paths, as well as capturing
& logging (at INFO) timing information tracking the time a persist task
spends in the queue, the active time spent actually persisting the data,
and the total duration of time since the request was created (sum of
both durations).
This adds a simple WAL replay benchmark to ingester2 that executes a
replay of a single line of LP.
Unfortunately each file in the benches directory is compiled as it's own
binary/crate, and as such is restricted to importing only "pub" types.
This sucks, as it requires you to either benchmark at a high level
(macro, not microbenchmarks - i.e. benchmarking the ingester startup,
not just the WAL replay) or you are forced to mark the reliant types &
functions as "pub", as well as all the other types/traits they reference
in their signatures. Because the performance sensitive code is usually
towards the lower end of the call stack, this can quickly lead to an
explosion of "pub" types causing a large amount of internal code to be
exported.
Instead this commit uses a middle-ground; benchmarked types & fns are
conditionally marked as "pub" iff the "benches" feature is enabled. This
prevents them from being visible by default, but allows the benchmark
function to call them.
The benchmark itself is also restricted to only run when this feature is
enabled.
* refactor: DF-driven on-demand mem limit instead of ahead-of-time heuristics
Closes#6310.
* refactor: rename and tune default exec mem limits
* fix: ingester2 bits after rebase
The ingester will replay the ops, and then immediately trigger a
persist! This comment is done :)
Outstanding is actually deleting the replayed files, which will come
with the ref-counted WAL segment change later.
Changes the WAL rotation task to drop the WAL segment after the
partition data has completely persisted.
This logic contains a race condition outlined (at length) in the code
comments, along with a plan to resolve it once I'm back from holiday.
For now, the extremely low likelihood and minor impact of the race
occurring is likely acceptable for testing purposes.
Changes the WAL rotation code to cause the current set of partitions to
be submitted for persistence.
Because rotating the WAL segment and triggering persistence is not
atomic (not under an exclusive lock preventing writes) the persisted
buffers MAY contain writes that appear in the new segment file.
In the happy/non-crash path, this will have no effect, however if the
ingester crashes and replays the WAL files, these writes will be
duplicated into object storage (where compaction will resolve the issue)
- this seems like a good trade-off, allowing us to avoid blocking write
requests for as long as it takes to rotate the WAL and mark all
partitions as persisting.
(though currently WAL files are not dropped and thus everything is
replayed all the time.)
Implements actor-based, parallel persistence in ingester2 with
controllable fan-out parallelism and queue depths.
This implementation encapsulates the complexity of persistence, queuing
and parallelism - the caller simply uses the handle to persist a
partition, while the actor handles fan-out to a set of persistence
workers, compaction in a separate thread-pool, and optional completion
notifications.
By consistently hashing persist jobs onto workers, parallelism is
achieved across partitions, but serialisation of partition persists is
enforced so that the sort key update is correctly serialised.
This commit swaps the existing single "persisting batch" slot (field) in
a PartitionData for an ordered queue of outstanding partitions.
This decouples marking a partition buffer for persistence from the
actual persistence operation, allowing them to proceed at different
rates. This reduces the complexity of persistence management, but also
allows us to gracefully handle "hot" partitions; for example, this
problematic scenario in the ingester(1) implementation during recovery:
* Writes come into a partition, reaching a size/row/hotness limit
* Partition is enqueued for persistence
* More writes come into the new buffer, exceeding the same limit
* Cannot persist the hot buffer because of outstanding persist op
Without this change the only possibilities in this situation are:
* stop ingest for the partition and error all writes that
(partially!) write to the partition, or
* continue accepting writes, allowing the partition to exceed the
limit that marked it for persistence in the first place
The latter is what the ingester(1) implementation does today, which
results in partitions exceeding their row/size/age limits, which exist
to limit the cost of generating the Parquet file from the buffer - this
is a significant contributor to instability during recovery.
This strategy enforces configured the limits on the partition buffer,
but does not block / slow down recovery while persistence is completed.
Pushes the (ref-counted/shared) deferred NamespaceName resolver into the
child TableData and PartitionData nodes in the BufferTree.
This allows the PartitionData to provide the name of the namespace to
which it belongs, in addition to the existing table name, and all
relevant IDs.
Replay the WAL log, if any, at startup.
Op replay is performed synchronously, during initialisation of the
ingester2 instance, and passes all ops through the "normal" write path
that the system uses once replay is complete, minus the WAL writer layer
- this helps to DRY the write path and minimise different behaviours.
Now partition cache entries are smaller, the number of entries held in
memory can be increased - this now uses ~2MiB of memory and drains the
cache during execution, amortising to 0.
This commit removes the invariant asserts of monotonicity carried over
from the "ingester" crate - ingester2 does not define any ordering of
writes within the system.
This commit also removes the SequenceNumberRange as it is no longer
useful to indirectly check the equality of two sets of ops -
non-monotonic writes means overlapping ranges does not guarantee a full
set of overlapping operations (gaps may be present). Likewise bounding
values (such as "min persisted") provide no useful meaning in an
out-of-order system.
Document the arbitrary reordering of concurrent writes in an ingester,
and the potential divergence of WAL entries / buffered state.
Also documents that in ingester2, sequence numbers only identify writes,
not their ordering.
Adds WalSink, an implementation of the DmlSink trait that commits DML
operations to the write-ahead log before passing the DML op into the
decorated inner DmlSink chain.
By structuring the WAL logic as a decorator, the chain of inner DmlSink
handlers within it are only ever invoked after the WAL commit has
successfully completed, which keeps all the WAL commit code in a
singly-responsible component. This also lets us layer on the WAL commit
logic to the DML sink chain after replaying any existing WAL files,
avoiding a circular WAL mess.
The application-logic level WAL code abstracts over the underlying WAL
implementation & codec through the WalAppender trait. This decouples the
business logic from the WAL implementation both for testing purposes,
and for trying different WAL implementations in the future.
* chore: Update Datafusion and arrow/arrow-flight/parquet to `28.0.0`
* chore: Update thrift to 0.17
* fix: use workspace arrow-flight in ingester2
* chore: Update for API changes
* fix: test
* chore: Update hakari
* chore: Update hakari again
* chore: Update trace_exporters to latest thrift
* fix: update test
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Sequence all gRPC write requests to (internally) order the resulting DML
operations.
These sequence numbers are assigned from a timestamp oracle and passed
through to the downstream DmlSink implementers.
This commit implements the QueryExec trait for the BufferTree, allow it
to be queried for the partition data it contains. With this change, the
BufferTree now provides "read your writes" functionality.
Notably the implementation streams the contents of individual partitions
to the caller on demand (pull-based execution), deferring acquiring the
partition lock until actually necessary and minimising the duration of
time a strong reference to a specific RecordBatch is held in order to
minimise the memory overhead.
During query execution a client sees a consistent snapshot of
partitions: once a client begins streaming the query response, incoming
writes that create new partitions do not become visible. However
incoming writes to an existing partition that forms part of the snapshot
set become visible iff they are ordered before the acquisition of the
partition lock when streaming that partition data to the client.
Allow the return type of the QueryExec trait's query_exec() method to be
parametrised by the implementer.
This allows the trait to be reused across different data sources that
return differing concrete types.
Changes the query code (taken from the ingester crate) to stream data
for query execution, tidy up unnecessary Result types and removing
unnecessary indirection/boxing.
Previously the query data sourcing would collect the set of RecordBatch
for a query response during execution, prior to sending the data to the
caller. Any data that was dropped or modified during this time meant the
underlying ref-counted data could not be released from memory until all
outstanding queries referencing it completed. When faced with multiple
concurrent queries and ongoing ingest, this meant multiple copies of
data could be held in memory at any one time.
After this commit, data is streamed to the user, minimising the duration
of time a reference to specific partition data is held, and therefore
eliminating the memory overhead of holding onto all the data necessary
for a query for as long as the client takes to read the data.
When combined with an upcoming PR to stream RecordBatch out of the
BufferTree, this should provide performant query execution with minimal
memory overhead, even for a maliciously slow reading client.
Implement a QueryExec decorator that emits named tracing spans covering
the inner delegate's query_exec() execution.
Captures the result, emitting the error string in the span on failure.
This commit implements the gRPC direct-write RPC interface (largely
copied from the ingester crate), and adds a much improved RPC query
handler.
Compared to the ingester crate, the query API is now split into two
defined halves - the API handler side, and types necessary to support it
(server/grpc/query.rs) and the Ingester query execution side (a stub in
query/exec.rs). These two halves maintain a separation of concerns, and
are interfaced by an abstract QueryExec trait (in query/trait.rs).
I also added the catalog RPC interface as it is currently exposed on the
ingester, though I am unsure if it is used by anything.
This commit also introduces the "init" module, and the
IngesterRpcInterface trait within it. This trait forms the public
ingester2 crate API, defining the complete set of methods external
crates can expect to utilise in a stable, unchanging and decoupled way.
The IngesterRpcInterface trait also serves as a method of type-erasure
on the underlying handler implementations, avoiding the need to
expose/pub the types, abstractions, and internal implementation details
of the ingester to external crates.
Adds an ingester2 crate to hold the MVP of the Kafkaless project.
This was necessary due to the tight coupling of the ingester internals
with tests in external crates, and eases the parallel development of two
version of the ingester.
This commit contains various changes from the "ingester" crate, mostly
removing the concept/references to a "shard" or "ShardId" where
possible.
This commit does not copy over all of the "ingester" crate - only those
components that are definitely needed. I will drag across more as
functionality is implemented.