This commit implements a PartitionCache decorator over the
PartitionProvider abstraction.
When an ingester starts up, the internal data structures are empty and
are lazily initialised for each namespace / table / partition as they
are observed in the stream of DML ops.
This lazy initialisation includes resolving the Partition ID and last
persisted sequence number offset value from the catalog for each
partition in each table in each namespace for which an op is observed -
this occurs in the hot path, while blocking ingest for a shard.
resolving each partition will cause a catalog query, this can cause a
spike in queries against the catalog, also resulting in unnecessarily
slow ingester recovery - we're effectively lazily warming a cache of
PartitionData in the hot path!
Instead this cache can be used to pre-warm the N most recently created
partitions (which are likely to have ongoing writes) at startup to
eliminate the hot-path overhead and associated catalog queries.
NOTE: unlike most of the other hot-path queries, partition persist
offset resolution cannot be eliminated by changes to the Kafka wire
format.
Lifts the PartitionProvider initialisation higher in the stack to a
point where a single instance can be used across all shards an ingester
manages.
This is a pre-requisite for sharing a cache of Partitions across all
shards.
Marks many internal data structures as non-pub.
Many remain as they're used across tests / from multiple callers
"peeking", but this limits the scope of false sharing in the future.
Move the initialisation of ShardData (an internal ingester data
structure) into the ingester itself.
Previously callers would initialise the ingester state, and pass it into
the IngesterData constructor.
Removes the "how" of initialising a per-partition buffer structure
(PartitionData) from the per-table buffer (TableData).
This is a cleaner separation of concerns - a table buffer is responsible
for addressing and initialising per-table partitions as necessary, and
buffering of ops for them. It does not have to be concerned with the
series of steps necessary to look up the various bits of data in order
to construct a PartitionData.
This abstract provider can be layered up to provide more complex
behaviours - I intend to add a read-through cache impl that decorates
the catalog impl in this commit, which should eliminate most partition
queries at ingester startup utilising the indirection added here.
* chore: Upgrade to Rust 1.64
* fix: Use iter find instead of a for loop, thanks clippy
* fix: Remove some needless borrows, thanks clippy
* fix: Use then_some rather than then with a closure, thanks clippy
* fix: Use iter retain rather than filter collect, thanks clippy
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* refactor: do not run de-dup in ingester for querier requests
This removes the entire de-dup logic from the inegster for querier
requests. Furthermore, it even removes the entire datafusion execution
from the querier and just dumps the in-memory record batches as quickly
as possible. No filters are applied. Note that even prior to this PR,
we've never applied projections (tracked by #5624).
**Pros:**
- speed up query planning within the querier (since we need the ingester
response for state reconciling)
- lowered ingester CPU load
**Cons:**
- more querier<>ingester network traffic
Closes#5602.
* test: extend query test case
* fix: ingester tests
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* fix: retry ingester requests faster
The retries introduced in #5695 are too slow and block the entire
querier for minutes (until the very long gRPC timeout kicks in).
* fix: add error details on why the query planning failed
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Changes many pub fields / methods to be pub(super), or if necessary,
pub(crate).
This helps maintain an internal API boundary for code hygiene, and helps
identify functions that are unused / only used in tests (which I've
annotated with cfg(test) and intend to remove - we should be driving
code under test via the public API rather than using test-only state
mutation, otherwise we're just testing our tests!)
This commit changes the prepare_data_to_querier() tests to drive the
ingester state by applying DML ops, therefore driving the prod code
paths (and testing them!) rather than having the tests set up what the
tests believe is the correct internal ingester state, and then asserting
on that state.
This gives us much better coverage of prod code paths, decouples the
tests from the internal state/representation of ingesters (making the
tests less fragile), and removes a bunch of special-cased, test-only
functions that are functionally similar, but not the same as, the prod
functions.
Unblocks #5658, further clean-up to come.
Instead of passing the ShardId into each function for child nodes of the
Shard, store it. This avoids the possibility of mistakenly passing the
wrong value.
A partition belongs to a table - this commit stores the table name in
the PartitionData (which was readily available at construction time)
instead of redundantly passing it into various functions at the risk of
getting it wrong.
When we construct a PartitionData we have the ShardId and TableId. This
commit stores them in the PartitionData for later use, rather than
repeatedly passing them in again when constructing snapshots, at the
risk of passing the wrong IDs.
Partition::snapshot_to_persisting() passes the ID of the partition it is
calling `snapshot_to_persisting()` on. The partition already knows what
its ID is, so at best it's redundant, and at worst, inconsistent with
the actual ID.
Changes the ingest code path to eliminate scanning the parquet_files
table to discover the last persisted offset per partition, instead
utilising the new persisted_sequence_number field on the Partition
itself to read the same value.
This lookup blocks ingest for the shard, so removing the expensive query
from the ingest hot path should improve catch-up time after a
restart/deployment.
Changes the ingester to record the per-partition, maximum persisted
sequencer offsets to the catalog. This will enable quick O(1) lookup in
the future, but the currently persisted value is only used to assert the
per-partition monotonic persist ordering invariant.
Assert the per-shard / per-partition persistence watermarks
monotonically increase, and document the invariant.
NOTE: this is not a new invariant, just a new assertion to validate it.
The maximum persisted sequence number is tracked to answer "up to where
has this partition been persisted", used for querying and skipping
writes that have already been applied (though I suspect this is
redundant).
This is a property of the partition, not the actual data buffer, so this
commit hoists it up out of the data buffer and onto the per-partition
data structure, internalising the field in the process (not pub).
I would like to remove de-dup from the querier<>ingester query path in #5602,
but the tombstone application and parquet write path can still do the full work.
Changes the ingester to log ALL reasons for a partition being marked for
persistence, rather than just one of the 4 previously. Fields are
consistent, but verbose / repetitive.
Also cleans up some misleading messages like "updating ..." to be logged
only when the update actually takes place.
In our data model, a chunk always belongs to a partition[^1], so let's
not make this attribute optional. The optional value only leads to
-- mostly surprising -- conditional behavior, ranging from "do not equalize
the partition sort key" (querier) to "always consider the chunk overlapping"
(iox_query when dealing with ingester chunks).
[^1]: This is even true when the chunk belongs to a parquet file that is not
yet added to the catalog, contrary to what a comment in the ingester
stated. The catalog and data model used by the querier are two totally
different things.
* refactor: remove dead code
* refactor: `Deduplicator::build_scan_plan` consumes `self`
There is no good reason to use the same `Deduplicator` twice. In
contrast I'm quite sure that this would lead to nasty bugs, because
`split_overlapped_chunks` exists early in some cases so the 2nd plan
would have old and new chunks mixed together.
* ci: use same feature set in `build_dev` and `build_release`
* ci: also enable unstable tokio for `build_dev`
* chore: update tokio to 1.21 (to fix console-subscriber 0.1.8
* fix: "must use"
This limit restricts a single partition to containing at most N rows
before it is marked for persistence (note: being marked for persistence
does not currently prevent further ingest for that partition.)
* feat: initial implementation of memory estimation for a compaction
* feat: estimate size of files and have the right actions for the needed budget
* feat: run candidates in parallel
* fix: have the right name for the column field of the output struct
* feat: add metrics for estimated budgets
* chore: cleanup
* chore: Apply suggestions from code review
Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
* fix: fix syntax after applying review's suggestions
* refactor: Convert a Vec to VecDeque to go well with pop and push
* chore: remove max_concurrent_size_bytes and input_size_threshold_bytes
* chore: remove input_file_count_threshold
* test: tests for estimate_arrow_bytes_for_file
Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* fix: do not loose data when Kafka reports that offset is above watermark
This can happen in certain cluster rebalance settings.
This is also linked to https://github.com/influxdata/rskafka/issues/147
but for the upstream issue I currently have no idea how to fix it, so
let's at least harden IOx against it.
Fixes#5128.
* refactor: panic for `SequenceNumberAfterWatermark`
* chore: return a struct with named and documented fields from `compact_persisting_batch`
* docs: Remove extra 'the' and fix a typo
Co-authored-by: Carol (Nichols || Goulding) <carol.nichols@gmail.com>
* feat: `QueryChunk::as_any`
* feat: allo `ChunkPruner::prune_chunks` to fail
* feat: limit per-table chunk data for every query
Closes#5211.
* fix: address review comments
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
* refactor: remove min_sequnce_number
* fix: typos
* fix: remove min_sequencer_number from new files from merging main
* fix: add back throwing error if the compactor compacts files persisted by the ingester after the ingester sends max seq_num back to querier
* test: add test_compactor_collision back but modify the input to make it work woth new changes
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
- remove `IOxSessionContext::default()` because untracked contexts
should only be created by tests
- remove `Option<IOxSessionContext>` because it is a typed workaround
for `IOxSessionContext::default`
Tests should use `IOxSessionContext::testing` and all _normal_ users
should create proper contexts.
I suspect this will help tracing or at least prevent silent regressions.
See #5129.
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* refactor: remove parquet chunk ID to `ChunkMeta`
* refactor: return `Arc` from `QueryChunk::summary`
This is similar to how we handle other chunk data like schemas. This
allows a chunk to change/refine its "believe" over its own payload while
it is passed around in the query stack.
Helps w/ #5032.
* test: add test for timestamps in kafka write buffer
* refactor: move timestamp batching test to generic tests
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Instead of using some hand-rolled timestamp-based logic (or just
"unknown") all over the place, just use logic introduced in #5017.
This requires slightly improved table summaries within the querier that
at least has min/max for the timestamp column. For that, the former
`IngesterChunk`-specific `calculate_summary` method was extended to
`create_basic_summary` to include that data and is now also used by
`QuerierParquetChunk`.
Note: `QuerierRBChunk` already has detailled metrics that are provided
by the read buffer implementation.
Should we ever need even better pruning for `QuerierParquetChunk` (or
`IngesterChunk`) then we _only_ need add extra data to the table
summaries.
Closes#4976.
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Went through and remove all lazy_static uses with once_cell (while waiting for the project to compile). There are still dependencies using lazy_static so it is still in the crate graph but at least there isn't an explicit dependency on it (and it is easier to update to `std::lazy::Lazy` once that is stable).
* refactor: change level 1 to level 2 preparing for next design changes
* fix: make level-2 consistent everywhere
* chore: remove unused comments
* refactor: change all the name level_1 to level_2 to completely replace 1 with 2 to amke everything consistent
* chore: add correspinding constants for the comapction levels in the comments
Co-authored-by: Dom <dom@itsallbroken.com>
* fix: avoid using min_time, which can be negative, for ChunkId. Using object store id which is uuid instead
* chore: Apply suggestions from code review
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
* chore: run fmt
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
The ingester maintains a rough "total memory in use" counter it uses to
try and limit the amount of memory the ingester is using overall.
When a partition is persisted, this total memory usage value is adjusted
to account for releasing the partition memory. Prior to this commit, the
ordering was:
* Writes increase the memory counter
* maybe_persist() is called to trigger persistence
* A partition is identified for persistence
* Partition memory usage is released back to the total memory counter
* Persistence starts
This meant that the partitions in the process of being persisted were
not accounted for in the ingester's total memory counter, and therefore
we could significantly overrun the configured memory limit.
After this commit, the ordering is:
* Writes increase the memory counter
* maybe_persist() is called to trigger persistence
* A partition is identified for persistence
* Persistence starts
* Persistence completes
* Partition memory usage is released back to the total memory counter
This ensures persisting partitions are sill tracked in the total memory
counter, causing pauses to correctly fire.
Changes the ingester to use the partition key derived in the router, and
transmitted over through the kafka API boundary.
This should have no observable behavioural change, but be more resilient
as we're no longer assuming the partitioning algorithm produces the same
value in both the router (where data is partitioned) and the ingester
(where data is persisted, segregated by partition key).
This is a pre-requisite to allowing the user to specify partitioning
schemes.
* refactor: store per-file column set in catalog
Together with the table-wide schema and the partition-wide sort key, this should
be everything we need to read a parquet file directly into memory
without peeking any file-level metadata.
The querier will use this to directly load parquet files into the read
buffer.
**WARNING: This requires a catalog wipe!**
Ref #4124.
* refactor: use proper `ColumnSet` type
* refactor: stream partitions from ingester
Ref #4849.
* refactor: do not collect record batched on the ingester side
Ref #4849.
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* refactor: use new ingester<>querier wire protocol
Use and document the new and more flexible ingester<>querier wire
protocol.
Note that the ingester does NOT stream the response data yet, but the
internal data structures would allow that. A follow-up change will
adjust the ingester code to stream the data.
Ref #4849.
* fix: typos
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
* refactor: clarify naming and public interface
* test: add schema assertion to `ingester_response_to_record_batches`
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
* refactor: prepare new ingester<>querier protocol on the querier side
This changes the querier internals to work with the new protocol. The
wire protocol stays the same (for now). There's a (somewhat hackish)
adapter in place on the querier side that converts the old to the new
protocol on-the-fly. This is an intermediate step before we actually
change the wire protocol (and in a step after that also take advantage
of the new possibilites on the ingester side).
Ref #4849.
* docs: explain adapter
* chore: TEMP Update DataFusion to pre-release
* chore: update arrow et al to 16.0.0
* chore: Run cargo hakari tasks
* fix: update reader read_dictionary API
* chore: Update to real Datafusion release
* fix: Update parquet API
* fix: update test
Co-authored-by: CircleCI[bot] <circleci@influxdata.com>
This commit changes the code base to use a new reference-counted
PartitionKey type wrapper, instead of passing a bare String around.
This allows the compiler to type check & verify usage of the partition
key, instead of passing a bare string around. By reference counting the
underlying string, we reduce memory usage for some use cases.
* feat: Change data type of catalog Postgres partition's sort_key from a string to an array of string
* test: add column with comma
* fix: use new protonuf field to avoid incompactible
* fix: ensure sort_key is an empty array rather than NULL
* refactor: address review comments
* refactor: address more comments
* chore: clearer comments
* chore: Update iox_catalog/migrations/20220607102200_change_sort_key_type_to_array.sql
* chore: Update iox_catalog/migrations/20220607102200_change_sort_key_type_to_array.sql
* fix: Rename migration so it will be applied after
Co-authored-by: Marko Mikulicic <mkm@influxdata.com>
* fix: do not return readable until a write is completely readable
* docs: Add diagram with partially buffered write
* refactor: account for actively buffering during update rather than fixup
* fix: fixup
* fix: use checked_sub
Co-authored-by: Marco Neumann <marco@crepererum.net>
* fix: checked_sub calculation
Co-authored-by: Marco Neumann <marco@crepererum.net>
Reduces memory usage in the ingester during persist operations by
streaming the results of the snapshot merge/sort/dedupe directly to
the parquet file.
Prior to this commit the output of the compact was buffered in memory
before being wrote to the parquet file.
* test: "optimize" ingesterrecord batches in query tests
It seems that I had the right idea in #4656 but wasn't able to trigger
https://github.com/influxdata/conductor/issues/955 because the query
tests do not "optimize" the record batches in the same way the actual
gRPC implementation does. If we apply the same transformation we indeed
end up with the same error.
* fix: all batches within the ingester flight response must have same schema
* refactor: simplify and reuse code
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* feat: enable debugging of failed querier->ingester requests
- extend `query-ingester` CLI to allow usage of predicates
- on failed requests: log all information that required for the CLI
- test the "ingester fails" scenario
* test: explain
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
* docs: improve
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
* refactor: move b64 pred. serde into a single crate
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Removes the min/max timestamp fields from the IoxMetadata proto
structure embedded within a Parquet file's metadata.
These values are redundant as they already exist within the Parquet
column statistics, and precluded streaming serialisation as these
removed min/max values were needed before serialising the file.
Remove the redundant row_count from the IoxMetadata structure that is
serialised into the Parquet file.
The reasoning is twofold:
* The Parquet file's native metadata already contains a row count
* Needing to know the number of rows up-front precludes streaming
Ok, so... this needed lots of... channels. Channels everywhere.
The stream method on TestWriteBufferStreamHandler previously assumed it
would only be called once. In a test where reset_to_earliest is called,
stream might be called again to get the reset stream.
We want to be able to control which of the streams gets which
operations, so that's why the macro now takes a vec of vec of
operations-- one vec of operations per expected call to stream, and the
stream will send all the operations in its vec.
The test thread needs to wait for the handler stream to consume the last
item from the last receiver stream, so when the
TestWriteBufferStreamHandler has set up the last expected call to
stream, pass back the last transmitter and have it wait until it's at
full expected capacity (which means all operations have been consumed by
the receiver).
The default behavior of the ingester is to panic if the min unpersisted
sequence number in the catalog is unknown to the write buffer due to the
retention policies having evicted that sequence number.
Specifying `--skip-to-oldest-available` changes this behavior to skip to
the oldest sequence number the write buffer does have available and go
from there.
Fixes#4624.
Implements an upload() method on the ParquetStorage type, consuming a
stream of RecordBatch, serialising the Parquet file, and uploading the
result to object storage. Returns the IOx-specific file metadata.
Currently while the upload() method accepts a stream of RecordBatch, the
actual resulting Parquet file is buffered in memory before uploading to
object store, due to lack of streaming upload functionality in the
ObjectStore abstraction - this isn't the end of the world, as the files
tend to be relatively small with our current usage.
This impl should be easily modified to be fully streaming once streaming
object store puts are implemented:
https://github.com/influxdata/object_store_rs/issues/9
Changes the code paths that interact with Parquet files in the object
store to reference the ParquetStorage directly (DRY refactor).
This change takes us from a dependency graph of:
┌─────────────────┐
│ │
▼ │
Parquet Consumer │
│ ┌──────────────┐
├────────▶│ParquetStorage│
▼ └──────────────┘
┌──────────────┐
│ ObjectStore │
└──────────────┘
│
┌────┴────┐
▼ ▼
File s3
System (etc)
to:
Parquet Consumer
│
▼
┌──────────────┐
│ParquetStorage│
└──────────────┘
│
▼
┌──────────────┐
│ ObjectStore │
└──────────────┘
│
┌────┴────┐
▼ ▼
File s3
System (etc)
With the ParquetStorage being solely responsible for managing
interactions with the object store when dealing with Parquet files.
Renames the Storage type so the context is clear in usage (i.e. fn
args), rather than having to rely on knowing the fully-qualified import
path to know what the type stores.
* ci: fix cargo deny
* chore: downgrade `socket2`, version 0.4.5 was yanked
* chore: rename `query` to `iox_query`
`query` is already taken on crates.io and yanked and I am getting tired
of working around that.
Emit a TRACE level log containing the op offset & other helpful fields.
This will allow us to identify which messages were last successfully
decoded, and which caused errors so we can pull them from analysis.
Adds a histogram metric "flight_query_duration_ms" that records the
duration a flight RPC query takes to complete. Broken down by query
result (success/error).
These were found by iterating over all of the dependencies of each
Cargo.toml, then grepping that crate for the dependency's name. If it
didn't show up, I attempted to remove it.
I left a few dependencies that this process flagged:
* generated_types
- `pbjson`,`serde`. Apparently used by the generated code.
* grpc-router-test-gen
- `prost`. Apparently used by the generated code.
* influxdb_iox
- `heappy`. Doesn't appear used, but is behind enough feature
flags that I don't care to reason about and it's already optional.
- `tikv_jemalloc_sys`. Appears to be setting a feature flag of an
indirect dependency.
* iox_gitops_adapter
- `k8s_openapi`. Appears to be setting a feature flag of an indirect
dependency.
* chore: Tool for automating arrow version update
* chore: Update datafusion and arrow/parquet/arrow-flight
* fix: update for changes in Arrow API
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* feat: use stored sort key to deduplicate data
* refactor: verify if one is a super sort key of the other
* test: unit tests for scan and deduplication plans
* fix: typo
* refactor: refactor and add comments
* feat: cache partition sort key to read during planning as needed
* test: tests for query plans with different overlap groups
* chore: cleanup
* chore: resolve merge conflicts
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* refactor: improve `IngesterData` public interface
* feat: impl `Debug` for `Test{Namespace,Sequencer}`
* refactor: trait interface for `LifecyleHandle`
This is required to mock the lifecycle for query tests.
* refactor: trait for partitioner
* feat: add per kafka partition durability reporting to write info response
* fix: buf lint + test cleanup
* fix: clean up protobuf
* refactor: pull out conversion of KafkaPartitionStatus into a function
* fix: fmt
* fix: typo
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Attaching the "batch => partition" mapping via per-batch schema KV
metadata does NOT work because flight will transmit the schema once for
all batches (even though on the Rust side we have a schema ref attached
to every batch, probably for convenience). Instead we now use the same
global protobuf metadata that we also use for the "partition => max
sequence number" information. This somewhat limits our ability to create
record batches lazily on the ingester side (since the global metadata is
sent before any actual payload) but I think we should not modify the
usage of the flight protocol too much right now (e.g. by sending more
schema messages). If this becomes an issue, we can always find a more
complex solution in the future.
* fix: return "not found" gRPC error instead of "internal" when ingester does not know table
* fix: properly handle "namespace not found" in ingester queries
* fix: make `initialize_db` work with async code
* test: add custom step for NG tests
* fix: handle "unknown table/namespace" resp. in querier
* docs: explain test setup
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
* refactor: querier<>ingester flight protocol adjustments
This makes a few adjustments to the querier<>ingester flight protocol.
Query Scope
===========
The querier will request data for ALL sequencer IDs for now. There is
no reason to have a request per sequencer ID. We can add a range/set
filter later if we want, but this is not required for now.
Partition-level
===============
The only time when the querier cares about sequencer IDs (i.e. sharding)
at all is when it selects which ingesters to ask for unpersisted data
(this is currently not implemented, it just asks all ingesters).
Afterwards the querier only cares about partitions (which are bound to
specific sequencers anyways) because this is the level where parquet
file persistence and compaction as well as deduplication happen. So we
make partitions a first-class citizen in the ingester response.
Metadata VS RecordBatches
=========================
The global app-metadata will list all partitions and their max
persisted parquet files and tombstones (theoretically tombstones are at
table-level, but the ingester could in the future break them down to the
partition-level). Then it receives a stream of record batches. Each
record batch is tagged (via key-value metadata in its schema) so it can
be assigned to a partition. At the moment the ingester returns 0 or 1
batches per unpersisted partition (0 in case we've filtered out all the
data via the predicate), but in the future it is free to return multiple
batches. This setup gives the ingester more freedom over memory
management and (potentially parallel) query processing, while at the
same time keeps the set of duplicated information minimal and allows
easy extensions (since the global metadata is a full-blown protobuf
message).
Querier
=======
At the moment the querier ignores all the metdata. Follow-up PRs will
change that.
* docs: improve
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
* refactor: make code clearer
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Removes the old stream_in_sequenced_entries() write buffer handler,
replacing it with the SequencedStreamHandler introduced in #4203.
This change will affect the metrics emitted by an ingester as outlined
in #4243.
Removes the Sync bound SequencedStreamHandler input stream type, as the
BoxStream returned by the WriteBufferStreamHandler is not Sync.
This change means the SequencedStreamHandler is not Sync either, but is
still Send and therefore can be moved into tokio tasks.
This commit adds an adaptor (IngestSinkAdaptor) that provides a DmlSink
implementation for the existing write path (IngesterData). With this,
the existing write path becomes compatible with the new
op stream handler (SequencedStreamHandler).
This commit adds the SinkInstrumentation type that decorates an inner
DmlSink with call latency and write buffer metrics.
The write buffer / sink call metrics may be split apart into two
separate responsibilities in the future if there are multiple DmlSink
that need instrumentation, but deferring adding more types until it is
needed.
* feat: Add `SequencerProgress` reporting to ingester
* refactor: Use KafkaPartition in write_summary
* fix: Update docstrings
* refactor: Change ingester to use KafkaPartition everywhere
* refactor: add SequencerProgress::combine
* refactor: return new SequencerProgress rather than updating
* fix: distinguish between yes/no/unknown in WriteSummary
* docs: Update data_types2/src/lib.rs
Co-authored-by: Paul Dix <paul@pauldix.net>
Co-authored-by: Paul Dix <paul@pauldix.net>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Pass the sort key from the catalog through to compact_persisting_batch.
If the sort key is Some, use that. If the sort key is None, compute it
from the data's cardinality with compute_sort_key.
Connects to #4196.
Adds the PeriodicWatermarkFetcher type responsible for querying write
buffer / Kafka for the maximum sequence number / offset, surfacing any
errors via both logs & metrics.
This high watermark / max offset value is used within the ingest
instrumentation metrics. This use case is tolerant of caching / stale
values, and as such the value is periodically updated to minimise load
on the write buffer.
Instruments the SequencedStreamHandler with a series of new metrics that
record the various error classes observable in the stream handler.
These metrics are labelled with potential_data_loss=true where relevant
to surface potential data loss events for alerting & further review.
Refactors the stream_in_sequenced_entries() into a new impl in the
SequencedStreamHandler type, decoupling the reading / decoding of ops
from Kafka (and associated error handling) from the "what happens to
those ops" concern to ease testing, encapsulate the specifics of "how to
get an op" and improve flexibility.
This is intended to provide robust error handling within what is
reasonably possible (unexpected errors are always unexpected!) while
retaining the existing metrics and functionality. I've also separated
out code that exists in the current impl specifically to drive tests
from the prod code path, instead driving those behaviours through mocks.
As of this commit, the handler is not used - this commit simply adds the
new impl.
Fix the ingester to track the max persisted sequence number per partition.
Ensure replay takes in data from unpersisted partitions.
Simplify the table persist info to not return a max persisted sequence number for the table as that information isn't needed.
Min/max values and distinct counts are already optional, so let's make
the null counts optional as well. This will be helpful for NG to deal w/
partial statistics (e.g. we only populate stats for the time column).
Note that the total count is still mandatory, but we normally have the
chunk/file-level row count at hand.
Set to_delete to the time the file was marked as deleted rather than
true.
Fixes#4059.
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
The sort key is optional and currently only produced by `iox_tests`.
Writing it within the ingester/compactor is tracked by #3968. The sort
key is read by the querier (and this will be verified by the query tests
and is required to merge #4103).
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Removed some unnecessary tests as they no longer apply with the new buffer structure. This will hopefully reduce the memory footprint of the ingesters significantly.
Closes#4072
This makes it way easier to dyn-type database implementations. The only
real change is that we make `QueryChunk::Error` opaque. Nobody is going
to inspect that anyways, it's just printed to the user.
This is a follow-up of #4053.
Ref #3934.
Emit a counter metric "ingest_paused_duration_ms_total" that records the
duration of time an ingester stream is paused with millisecond
granularity.
This metric will allow us to measure the frequency and severity of, and
alert on, an ingester stopping ingest due to memory limits enforced by
the LifecycleManager. This will help us tune these config params.