Moves the "you've tried to seek into the future!" error to the point at
which the seek attempt was made.
This makes more sense than deferring the seek error until read time, and
is easier to determine this is the case rather than at read time (where
the read response error contains an invalid high_watermark value of -1,
making it impossible to conclusively determine what has happened).
In staging we observed an ingester panic due to the write buffer stream
yielding an WriteBufferErrorKind::SequenceNumberAfterWatermark,
suggesting the ingester was attempting to read from an offset that
exceeds the current max write offset in Kafka (high watermark offset).
This turned out not to be the case - the partition had a single write at
offset 2, and the ingester was attempting to seek to offset 1. The first
read would fail (offset 1 does not exist) and the error handling did not
account for the high watermark not being correctly set (-1 in the
response).
I have no idea why rskafka returns this watermark / doesn't retry / etc
but this change will allow the ingesters to recover.
Remove each cache hit from the partition cache, as each partition should
be looked up at most once.
This amortises the memory usage of the cache, as it should be "drained"
of hot partitions.
Cache the 10,000 most recent partitions at startup, and share them
across all shards.
At commit time, there are approx ~8,000 partitions per day, per
ingester, so this should cache all of the partitions for a given day so
far at startup.
This commit implements a PartitionCache decorator over the
PartitionProvider abstraction.
When an ingester starts up, the internal data structures are empty and
are lazily initialised for each namespace / table / partition as they
are observed in the stream of DML ops.
This lazy initialisation includes resolving the Partition ID and last
persisted sequence number offset value from the catalog for each
partition in each table in each namespace for which an op is observed -
this occurs in the hot path, while blocking ingest for a shard.
resolving each partition will cause a catalog query, this can cause a
spike in queries against the catalog, also resulting in unnecessarily
slow ingester recovery - we're effectively lazily warming a cache of
PartitionData in the hot path!
Instead this cache can be used to pre-warm the N most recently created
partitions (which are likely to have ongoing writes) at startup to
eliminate the hot-path overhead and associated catalog queries.
NOTE: unlike most of the other hot-path queries, partition persist
offset resolution cannot be eliminated by changes to the Kafka wire
format.
Lifts the PartitionProvider initialisation higher in the stack to a
point where a single instance can be used across all shards an ingester
manages.
This is a pre-requisite for sharing a cache of Partitions across all
shards.
Marks many internal data structures as non-pub.
Many remain as they're used across tests / from multiple callers
"peeking", but this limits the scope of false sharing in the future.
Move the initialisation of ShardData (an internal ingester data
structure) into the ingester itself.
Previously callers would initialise the ingester state, and pass it into
the IngesterData constructor.
Removes the "how" of initialising a per-partition buffer structure
(PartitionData) from the per-table buffer (TableData).
This is a cleaner separation of concerns - a table buffer is responsible
for addressing and initialising per-table partitions as necessary, and
buffering of ops for them. It does not have to be concerned with the
series of steps necessary to look up the various bits of data in order
to construct a PartitionData.
This abstract provider can be layered up to provide more complex
behaviours - I intend to add a read-through cache impl that decorates
the catalog impl in this commit, which should eliminate most partition
queries at ingester startup utilising the indirection added here.
* chore: Upgrade to Rust 1.64
* fix: Use iter find instead of a for loop, thanks clippy
* fix: Remove some needless borrows, thanks clippy
* fix: Use then_some rather than then with a closure, thanks clippy
* fix: Use iter retain rather than filter collect, thanks clippy
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* refactor: do not run de-dup in ingester for querier requests
This removes the entire de-dup logic from the inegster for querier
requests. Furthermore, it even removes the entire datafusion execution
from the querier and just dumps the in-memory record batches as quickly
as possible. No filters are applied. Note that even prior to this PR,
we've never applied projections (tracked by #5624).
**Pros:**
- speed up query planning within the querier (since we need the ingester
response for state reconciling)
- lowered ingester CPU load
**Cons:**
- more querier<>ingester network traffic
Closes#5602.
* test: extend query test case
* fix: ingester tests
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* fix: retry ingester requests faster
The retries introduced in #5695 are too slow and block the entire
querier for minutes (until the very long gRPC timeout kicks in).
* fix: add error details on why the query planning failed
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Changes many pub fields / methods to be pub(super), or if necessary,
pub(crate).
This helps maintain an internal API boundary for code hygiene, and helps
identify functions that are unused / only used in tests (which I've
annotated with cfg(test) and intend to remove - we should be driving
code under test via the public API rather than using test-only state
mutation, otherwise we're just testing our tests!)
This commit changes the prepare_data_to_querier() tests to drive the
ingester state by applying DML ops, therefore driving the prod code
paths (and testing them!) rather than having the tests set up what the
tests believe is the correct internal ingester state, and then asserting
on that state.
This gives us much better coverage of prod code paths, decouples the
tests from the internal state/representation of ingesters (making the
tests less fragile), and removes a bunch of special-cased, test-only
functions that are functionally similar, but not the same as, the prod
functions.
Unblocks #5658, further clean-up to come.
Instead of passing the ShardId into each function for child nodes of the
Shard, store it. This avoids the possibility of mistakenly passing the
wrong value.
A partition belongs to a table - this commit stores the table name in
the PartitionData (which was readily available at construction time)
instead of redundantly passing it into various functions at the risk of
getting it wrong.
When we construct a PartitionData we have the ShardId and TableId. This
commit stores them in the PartitionData for later use, rather than
repeatedly passing them in again when constructing snapshots, at the
risk of passing the wrong IDs.
Partition::snapshot_to_persisting() passes the ID of the partition it is
calling `snapshot_to_persisting()` on. The partition already knows what
its ID is, so at best it's redundant, and at worst, inconsistent with
the actual ID.
Changes the ingest code path to eliminate scanning the parquet_files
table to discover the last persisted offset per partition, instead
utilising the new persisted_sequence_number field on the Partition
itself to read the same value.
This lookup blocks ingest for the shard, so removing the expensive query
from the ingest hot path should improve catch-up time after a
restart/deployment.
Changes the ingester to record the per-partition, maximum persisted
sequencer offsets to the catalog. This will enable quick O(1) lookup in
the future, but the currently persisted value is only used to assert the
per-partition monotonic persist ordering invariant.
Assert the per-shard / per-partition persistence watermarks
monotonically increase, and document the invariant.
NOTE: this is not a new invariant, just a new assertion to validate it.
The maximum persisted sequence number is tracked to answer "up to where
has this partition been persisted", used for querying and skipping
writes that have already been applied (though I suspect this is
redundant).
This is a property of the partition, not the actual data buffer, so this
commit hoists it up out of the data buffer and onto the per-partition
data structure, internalising the field in the process (not pub).
I would like to remove de-dup from the querier<>ingester query path in #5602,
but the tombstone application and parquet write path can still do the full work.
Changes the ingester to log ALL reasons for a partition being marked for
persistence, rather than just one of the 4 previously. Fields are
consistent, but verbose / repetitive.
Also cleans up some misleading messages like "updating ..." to be logged
only when the update actually takes place.
In our data model, a chunk always belongs to a partition[^1], so let's
not make this attribute optional. The optional value only leads to
-- mostly surprising -- conditional behavior, ranging from "do not equalize
the partition sort key" (querier) to "always consider the chunk overlapping"
(iox_query when dealing with ingester chunks).
[^1]: This is even true when the chunk belongs to a parquet file that is not
yet added to the catalog, contrary to what a comment in the ingester
stated. The catalog and data model used by the querier are two totally
different things.
* refactor: remove dead code
* refactor: `Deduplicator::build_scan_plan` consumes `self`
There is no good reason to use the same `Deduplicator` twice. In
contrast I'm quite sure that this would lead to nasty bugs, because
`split_overlapped_chunks` exists early in some cases so the 2nd plan
would have old and new chunks mixed together.
* ci: use same feature set in `build_dev` and `build_release`
* ci: also enable unstable tokio for `build_dev`
* chore: update tokio to 1.21 (to fix console-subscriber 0.1.8
* fix: "must use"
This limit restricts a single partition to containing at most N rows
before it is marked for persistence (note: being marked for persistence
does not currently prevent further ingest for that partition.)
* feat: initial implementation of memory estimation for a compaction
* feat: estimate size of files and have the right actions for the needed budget
* feat: run candidates in parallel
* fix: have the right name for the column field of the output struct
* feat: add metrics for estimated budgets
* chore: cleanup
* chore: Apply suggestions from code review
Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
* fix: fix syntax after applying review's suggestions
* refactor: Convert a Vec to VecDeque to go well with pop and push
* chore: remove max_concurrent_size_bytes and input_size_threshold_bytes
* chore: remove input_file_count_threshold
* test: tests for estimate_arrow_bytes_for_file
Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>