In https://github.com/influxdata/influxdb_iox/pull/5754 I added code at
seek() time to check if the offset exists, and refuse to seek if that's
not the case, effectively making this check redundant - I left it in on
the assumption that some cases previously added would work!
Unfortunately this doesn't seem to be the case -
performing a read-ahead-of-data and read-behind-data seems to cause the
high_watermark to be returned as -1, meaning this code never worked?!
This new read-ahead-of-data match arm took priority over the
SequenceNumberNoLongerExists arm, effectively preventing the ingester
from taking the desired remediation (skipping to most recent write, or
erroring, depending on configuration).
Moves the "you've tried to seek into the future!" error to the point at
which the seek attempt was made.
This makes more sense than deferring the seek error until read time, and
is easier to determine this is the case rather than at read time (where
the read response error contains an invalid high_watermark value of -1,
making it impossible to conclusively determine what has happened).
In staging we observed an ingester panic due to the write buffer stream
yielding an WriteBufferErrorKind::SequenceNumberAfterWatermark,
suggesting the ingester was attempting to read from an offset that
exceeds the current max write offset in Kafka (high watermark offset).
This turned out not to be the case - the partition had a single write at
offset 2, and the ingester was attempting to seek to offset 1. The first
read would fail (offset 1 does not exist) and the error handling did not
account for the high watermark not being correctly set (-1 in the
response).
I have no idea why rskafka returns this watermark / doesn't retry / etc
but this change will allow the ingesters to recover.
Remove each cache hit from the partition cache, as each partition should
be looked up at most once.
This amortises the memory usage of the cache, as it should be "drained"
of hot partitions.
Cache the 10,000 most recent partitions at startup, and share them
across all shards.
At commit time, there are approx ~8,000 partitions per day, per
ingester, so this should cache all of the partitions for a given day so
far at startup.
This commit implements a PartitionCache decorator over the
PartitionProvider abstraction.
When an ingester starts up, the internal data structures are empty and
are lazily initialised for each namespace / table / partition as they
are observed in the stream of DML ops.
This lazy initialisation includes resolving the Partition ID and last
persisted sequence number offset value from the catalog for each
partition in each table in each namespace for which an op is observed -
this occurs in the hot path, while blocking ingest for a shard.
resolving each partition will cause a catalog query, this can cause a
spike in queries against the catalog, also resulting in unnecessarily
slow ingester recovery - we're effectively lazily warming a cache of
PartitionData in the hot path!
Instead this cache can be used to pre-warm the N most recently created
partitions (which are likely to have ongoing writes) at startup to
eliminate the hot-path overhead and associated catalog queries.
NOTE: unlike most of the other hot-path queries, partition persist
offset resolution cannot be eliminated by changes to the Kafka wire
format.
Lifts the PartitionProvider initialisation higher in the stack to a
point where a single instance can be used across all shards an ingester
manages.
This is a pre-requisite for sharing a cache of Partitions across all
shards.
Adds a Partition::most_recent_n() method to the catalog interface,
returning the N most recent partitions for a given set of shards.
The most recently created partitions are likely to be currently "hot"
for writes, and are cheap to list.
Marks many internal data structures as non-pub.
Many remain as they're used across tests / from multiple callers
"peeking", but this limits the scope of false sharing in the future.
Move the initialisation of ShardData (an internal ingester data
structure) into the ingester itself.
Previously callers would initialise the ingester state, and pass it into
the IngesterData constructor.
* feat: send only needed projection columns from querier to ingester in case of normal SQL queries
* refactor: push column index down until we need to convert them strings
* fix: make the test deterministic
* test: test for the projection pushdown
* test: add asserts for the proj pushdown test
* test: implement projection pushdown for partitions of MockIngesterConnection
* chore: cleanup
* chore: address review comments
* chore: Apply suggestions from code review
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
* refactor: address review comments
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Removes the "how" of initialising a per-partition buffer structure
(PartitionData) from the per-table buffer (TableData).
This is a cleaner separation of concerns - a table buffer is responsible
for addressing and initialising per-table partitions as necessary, and
buffering of ops for them. It does not have to be concerned with the
series of steps necessary to look up the various bits of data in order
to construct a PartitionData.
This abstract provider can be layered up to provide more complex
behaviours - I intend to add a read-through cache impl that decorates
the catalog impl in this commit, which should eliminate most partition
queries at ingester startup utilising the indirection added here.