* feat: introduce a new way of max_sequence_number for ingester, compactor and querier
* chore: cleanup
* feat: new column max_l0_created_at to order files for deduplication
* chore: cleanup
* chore: debug info for chnaging cpu.parquet
* fix: update test parquet file
Co-authored-by: Marco Neumann <marco@crepererum.net>
* feat: introduce scratchpad store for compactor
Use an intermediate in-memory store (can be a disk later if we want) to
stage all inputs and outputs of the compaction. The reasons are:
- **fewer IO ops:** DataFusion's streaming IO requires slightly more
IO requests (at least 2 per file) due to the way it is optimized to
read as little as possible. It first reads the metadata and then
decides which content to fetch. In the compaction case this is (esp.
w/o delete predicates) EVERYTHING. So in contrast to the querier,
there is no advantage of this approach. In contrary this easily adds
100ms latency to every single input file.
- **less traffic:** For divide&conquer partitions (i.e. when we need to
run multiple compaction steps to deal with them) it is kinda pointless
to upload an intermediate result just to download it again. The
scratchpad avoids that.
- **higher throughput:** We want to limit the number of concurrent
DataFusion jobs because we don't wanna blow up the whole process by
having too much in-flight arrow data at the same time. However while
we perform the actual computation, we were waiting for object store
IO. This was limiting our throughput substantially.
- **shadow mode:** De-coupling the stores in this way makes it easier to
implement #6645.
Note that we assume here that the input parquet files are WAY SMALLER
than the uncompressed Arrow data during compaction itself.
Closes#6650.
* fix: panic on shutdown
* refactor: remove shadow scratchpad (for now)
* refactor: make scratchpad safe to use
* chore: Update Datafusion and arrow/arrow-flight/parquet to `28.0.0`
* chore: Update thrift to 0.17
* fix: use workspace arrow-flight in ingester2
* chore: Update for API changes
* fix: test
* chore: Update hakari
* chore: Update hakari again
* chore: Update trace_exporters to latest thrift
* fix: update test
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* chore: Update datafusion pin + api code
* chore: Run cargo hakari tasks
* refactor: combine_sort_key is more idomatic and add rationale comments
* refactor: satisfy borrow checker and updated comments
* fix: Add test case for combine_sort_key
* fix: Apply suggestions from code review
Co-authored-by: Marco Neumann <marco@crepererum.net>
* fix: Add back test for deeply nested expression
* fix: Update output ordering
Co-authored-by: CircleCI[bot] <circleci@influxdata.com>
Co-authored-by: Marco Neumann <marco@crepererum.net>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* refactor: simplify `QueryChunk` data access
We have only two types for chunks (now that the RUB is gone):
1. In-memory RecordBatches
2. Parquet files
Loads of logic is duplicated in the different `read_filter`
implementations. Also `read_filter` hides a solid amount of logic from
DataFusion, which will prevent certain (future) optimizations. To enable #5897
and to simplify the interface, let the chunks return the data (batches
or metadata for parquet files) directly and let `iox_query` perform the
actual heavy-lifting.
* docs: improve
Co-authored-by: Andrew Lamb <alamb@influxdata.com>
* docs: improve
Co-authored-by: Andrew Lamb <alamb@influxdata.com>
Co-authored-by: Andrew Lamb <alamb@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
We basically assume everywhere that a column falls into one of the three
known categories (time, tag, field), so lets encode this in our type
system instead of defining "unknown" as "undefined behavior, may or may
not crash".
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Now that we read throw `ParquetExec`, `ROW_GROUP_READ_SIZE` is no longer
used.
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Use the proper top-level DataFusion context and register the object
store there.
Note that we still hide the `ParquetExec` behind an opaque record batch
stream. Fixing that is next on my list.
Helps with #5897.
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Use a proper typed stream instead of peeking the first element. This is
more in line with our remaining stack and shall also improve error
handling.
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* chore: Improve debug logging when parquet files are created
* fix: add duration for encoding parqut
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* ci: use same feature set in `build_dev` and `build_release`
* ci: also enable unstable tokio for `build_dev`
* chore: update tokio to 1.21 (to fix console-subscriber 0.1.8
* fix: "must use"
Remove our own hand-rolled logic and let DataFusion read the parquet
files.
As a bonus, this now supports predicate pushdown to the deserialization
step, so we can use parquets as in in-mem buffer.
Note that this currently uses some "nested" DataFusion hack due to the
way the `QueryChunk` interface works. Midterm I'll change the interface
so that the `ParquetExec` nodes are directly visible to DataFusion
instead of some opaque `SendableRecordBatchStream`.
Previously when attempting to serialise a stream of one or more
RecordBatch containing no rows (resulting in an empty file), the parquet
serialisation code would panic.
This changes the code path to raise an error instead, to support the
compactor making multiple splits at once, which may overlap a single
chunk:
────────────── Time ────────────▶
│ │
┌█████──────────────────────█████┐
│█████ │ Chunk 1 │ █████│
└█████──────────────────────█████┘
│ │
│ │
Split T1 Split T2
In the example above, the chunk has an unusual distribution of write
timestamps over the time range it covers, with all data having a
timestamp before T1, or after T2. When a running a SplitExec to slice
this chunk at T1 and T2, the middle of the resulting 3 subsets will
contain no rows. Because we store only the min/max timestamps in the
chunk statistics, it is unfortunately impossible to prune one of these
split points from the plan ahead of time.
* fix: make batch size back to 1024 to see if the OOM in the compactor go away
* fix: address review comments
* chore: Apply suggestions from code review
Co-authored-by: Marco Neumann <marco@crepererum.net>
* fix: import needed constant
Co-authored-by: Marco Neumann <marco@crepererum.net>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* refactor: increase batch size when reading parquet
This reduced our overhead when reading parquet files quite a lot.
In some internal benchmark, this reduces the size to perform a single
series aggregation of a rather large series with cold caches from 58s to
48s for cold caches. No real difference could be measured for warm
caches (~21ms for both).
This should also help the compactor since the record batches should be
larger.
* refactor: ensure that parquet row group size is in-sync
Ensure that we use the same row group size for reading and writing
parquet files. This is the same value as upstream currently uses as a
default, but let's make sure we don't diverge from that:
3032a521c9/parquet/src/file/properties.rs (L65)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* refactor: remove min_sequnce_number
* fix: typos
* fix: remove min_sequencer_number from new files from merging main
* fix: add back throwing error if the compactor compacts files persisted by the ingester after the ingester sends max seq_num back to querier
* test: add test_compactor_collision back but modify the input to make it work woth new changes
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Instead of using some hand-rolled timestamp-based logic (or just
"unknown") all over the place, just use logic introduced in #5017.
This requires slightly improved table summaries within the querier that
at least has min/max for the timestamp column. For that, the former
`IngesterChunk`-specific `calculate_summary` method was extended to
`create_basic_summary` to include that data and is now also used by
`QuerierParquetChunk`.
Note: `QuerierRBChunk` already has detailled metrics that are provided
by the read buffer implementation.
Should we ever need even better pruning for `QuerierParquetChunk` (or
`IngesterChunk`) then we _only_ need add extra data to the table
summaries.
Closes#4976.
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Prior to this change background tasks that we feed into `AdapterStream`
can panic but that would just end the stream without any user-visible
error (except for the panic message on stdout/stderr).
This was found while developing #4964. I have proposed another fix in #4966
but found that I actually developed an existing solution a 2nd time:
`watch_task`. But I also see a major issue with the existing API: one
can create `AdapterStream` with ordinary tokio tasks that are not
watched at all, leaving the burden to the implementor to check for that
(and actually we forgot that in `parquet_file`).
So this change takes a slightly different approach:
The `AdapterStream` does NOT accept ordinary join handles any longer but
requires that you pass a "watched task". The newly introduced
`WatchedTask` does the same as we did manually before: wrapping a future
into a tokio task, watch it and wrap the watcher into a task.
It is now way more difficult to do anything stupid (sure you can still
mix up the tasks and the channels, but we need at least some flexibility
here to allow for "split" and potential future fan-in/out constructs).
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* refactor: change level 1 to level 2 preparing for next design changes
* fix: make level-2 consistent everywhere
* chore: remove unused comments
* refactor: change all the name level_1 to level_2 to completely replace 1 with 2 to amke everything consistent
* chore: add correspinding constants for the comapction levels in the comments
Co-authored-by: Dom <dom@itsallbroken.com>
* refactor: remove `DecodedParquetFile` from `iox_tests`
* refactor: remove `DecodedParquetFile` from querier
Also pull out all the chunk schema and sort key handling into a function
so that RB chunks and parquet chunks mostly use the same code path.
* refactor: remove `DecodedParquetFile`
* refactor: remove `ParquetFileWithMetadata` usage
* fix: test data consistency
* fix: avoid using min_time, which can be negative, for ChunkId. Using object store id which is uuid instead
* chore: Apply suggestions from code review
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
* chore: run fmt
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* feat: conversion from `ParquetFile` to `ParquetFilePath`
* refactor: slim down parquet chunk
- ensure it works without binary parquet metadata
- timestamp range is no longer optional (ensured by the NG type system)
- remove table summary: this is only needed for SOME API users. The
compactor can perfectly work without statistics since has the timestamp
range which is sufficient for the current overlap check (we don't use
any other primary key stats at the moment). The querier currently does
NOT use parquet chunks (was replaced by read buffer) but if it will
again in some future it will likely need to find a way to fetch and
cache the statistics.
- the schema is now provided by the API user since it can be
reconstructed using the NG catalog only (and "wrong" column orders are
tolerated as of #4921)
Ref #4124
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* fix: column handling when reading parquet files
This improves/fixes/tests a few aspects when reading parquet files:
- fix usage of `Selection::Some(...)`. This was broken since #4912 but
apparently no test caught that.
- ensure that the order of `Selection::Some(...)` is preserved
- ensure that schema metadata is attached to output batches
- ignore parquet columns that we don't care about (i.e. do not select)
- allow parquet file to have a different column order than our internal
bookkeeping, this makes it way simpler to read parquet files w/o
scanning the metadata first
- extend the test coverage
Ref #4124.
* test: even more tests for parquet reader
* refactor: `TestPartition::update_sort_key` should return an `Arc`
The whole test framework is built around `Arc`s, so let's fix this
consistency issue.
* fix: actually calculate correct column set in test framework
* feat: check expected parquet file schema
While working on the querier I made some mistakes regarding schemas and
such a check would have greatly improved the debugging experience.
* feat: namespace cache expiration
* fix: improve parquet schema check
* fix: remove clone
* refactor: store per-file column set in catalog
Together with the table-wide schema and the partition-wide sort key, this should
be everything we need to read a parquet file directly into memory
without peeking any file-level metadata.
The querier will use this to directly load parquet files into the read
buffer.
**WARNING: This requires a catalog wipe!**
Ref #4124.
* refactor: use proper `ColumnSet` type