* refactor: rework querier concurrency limiting
With #4752 we introduced a concurrency limit into the querier. It works
by drawing permits from a central semaphore whenever we create a
`QuerierNamespace`. This however only limits concurrency during query
planning and not query execution, because the objects contained within
the plan (chunks and some metadata) neither reference the permit nor the
`QuerierNamespace`.
Now one approach to fix that would be to wire up the permit all the down
into all the query-related data structures. This however is very fiddly
and potentially will get lost at some point, because as soon as we
transform these data structures -- e.g. into streams -- the permit might
get lost again. This will be potentially query-dependent and very hard
to debug.
So instead we reverse the approach and track the permits at the upper
layer of the stack: the gRPC service entry points. There we also need to
be careful -- e.g. when we return streams to tonic -- but it's way
easier to review that then the deeply nested object hierarchy that is
involved with queries. Also the separation of concerns is a bit clearer,
because why would a "chunk" care about the "query concurrency" as a
whole.
* refactor: improve gRPC permit keeping and prepare tests
* test: "optimize" ingesterrecord batches in query tests
It seems that I had the right idea in #4656 but wasn't able to trigger
https://github.com/influxdata/conductor/issues/955 because the query
tests do not "optimize" the record batches in the same way the actual
gRPC implementation does. If we apply the same transformation we indeed
end up with the same error.
* fix: all batches within the ingester flight response must have same schema
* refactor: simplify and reuse code
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* fix: do NOT block in parquet file IO
I think for historical reason we were using blocking IO to read parquet
files. With the current streaming `SendableRecordStream` approach this
is technically NOT required anymore.
Now one might think that the sync-async dance that we did is kinda
harmless, but looking at our producition querier I think it is really
bad. The querier seems to be stuck but looking at `strace` and other
health signal it seems it is not entirely dead. Looking at GDB
backtraces it seems that nearly all threads are busy in
`download_and_scan_parquet`. Looking at the tokio docs
(<https://docs.rs/tokio/1.18.2/tokio/task/fn.spawn_blocking.html>)
for `spawn_blocking` (which is used to start the sync download) this
makes sense: tokio only starts replacement threads for the current
runtime thread (which calls `spawn_blocking`) if this does NOT exceed the
runtime thread limit. However we set the runtime thread limit to the
number of CPU cores available to IOx, so this is a limiting factor. This
means that there are only a few threads left to do actual work (I've
seen postgres data flowing back and forth for example) but tokio is not
able to use its full potential anymore. This is esp. bad because the
sync code in `download_and_scan_parquet` then uses `futures` `block_on`
functionality to call back into async code, so it waits for tokio
itself.
The change is rather simple: just use async task spawns.
* fix: use async IO to write stream to temp file
* fix: do not block tokio thread during parquet file reading
* refactor: ensure parquet IO tasks are cancelled if they are not needed anymore
There is no REAL way to cancel sync tasks, but at least we can try our
best.
* chore: Update datafusion deps
* fix: fix for changes in ScalarValue
* fix: fix for using TableSource rather than TableProvider
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* feat: enable debugging of failed querier->ingester requests
- extend `query-ingester` CLI to allow usage of predicates
- on failed requests: log all information that required for the CLI
- test the "ingester fails" scenario
* test: explain
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
* docs: improve
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
* refactor: move b64 pred. serde into a single crate
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Implements an upload() method on the ParquetStorage type, consuming a
stream of RecordBatch, serialising the Parquet file, and uploading the
result to object storage. Returns the IOx-specific file metadata.
Currently while the upload() method accepts a stream of RecordBatch, the
actual resulting Parquet file is buffered in memory before uploading to
object store, due to lack of streaming upload functionality in the
ObjectStore abstraction - this isn't the end of the world, as the files
tend to be relatively small with our current usage.
This impl should be easily modified to be fully streaming once streaming
object store puts are implemented:
https://github.com/influxdata/object_store_rs/issues/9
Implements a streaming RecordBatch to Parquet file serialiser.
This impl automatically discovers the schema of the RecordBatch stream,
and accepts &mut destination types (internalising the handle
cloning/etc) to simplify caller usage.
This encoder returns the resulting FileMetaData to allow callers to
inspect the resulting metadata without reading back the file.
Currently unused / not yet plumbed in.
* fix: ensure that query tokio background tasks are canceled
While I am not entirely sure if this explains some of the memory leaks I
am seeing in prod, not canceling the tasks correctly certainly makes
debugging way harder and also renders certain form of throttling (e.g.
max. concurrent queries) somewhat ineffective.
Note that parquet file downloads are currently NOT canceled because
tokios `spawn_blocking` cannot be canceled.
* refactor: `Vec` -> `Option`
* refactor: `spawn_blocking` creates a join handle, even though it is useless
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Changes the code paths that interact with Parquet files in the object
store to reference the ParquetStorage directly (DRY refactor).
This change takes us from a dependency graph of:
┌─────────────────┐
│ │
▼ │
Parquet Consumer │
│ ┌──────────────┐
├────────▶│ParquetStorage│
▼ └──────────────┘
┌──────────────┐
│ ObjectStore │
└──────────────┘
│
┌────┴────┐
▼ ▼
File s3
System (etc)
to:
Parquet Consumer
│
▼
┌──────────────┐
│ParquetStorage│
└──────────────┘
│
▼
┌──────────────┐
│ ObjectStore │
└──────────────┘
│
┌────┴────┐
▼ ▼
File s3
System (etc)
With the ParquetStorage being solely responsible for managing
interactions with the object store when dealing with Parquet files.
* refactor: require `Resource`s to be convertible to `u64`
* refactor: require `Resource`s to have a unit name
* refactor: make LRU cache IDs static
* feat: add LRU cache metrics
* docs: improve type names in LRU doctest
* docs: epxlain `MeasuredT`
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
* docs: explain `test_metrics`
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
This commit changes the protobuf record batch encoding to skip entirely
NULL columns when serialising. This prevents the deserialisation from
erroring due to a column type inference failure.
Prior to this commit, when the system was presented a record batch such
as this:
| time | A | B |
| ---------- | ---- | ---- |
| 1970-01-01 | 1 | NULL |
| 1970-07-05 | NULL | 1 |
Which would be partitioned by YMD into two separate partitions:
| time | A | B |
| ---------- | ---- | ---- |
| 1970-01-01 | 1 | NULL |
and:
| time | A | B |
| ---------- | ---- | ---- |
| 1970-07-05 | NULL | 1 |
Both partitions would contain an entirely NULL column.
Both of these partitioned record batches would be successfully encoded,
but decoding the partition fails due to the inability to infer a column
type from the serialised format which contains no values, which on the
wire, looks like:
Column {
column_name: "B",
semantic_type: Field,
values: Some(
Values {
i64_values: [],
f64_values: [],
u64_values: [],
string_values: [],
bool_values: [],
bytes_values: [],
packed_string_values: None,
interned_string_values: None,
},
),
null_mask: [
1,
],
},
In a column that is not entirely NULL, one of the "Values" fields would
be non-empty, and the decoder would use this to infer the type of the
column.
Because we have chosen to not differentiate between "NULL" and "empty"
in our proto encoding, the decoder cannot infer which field within the
"Values" struct the column belongs to - all are valid, but empty.
This commit prevents this type inference failure by skipping any columns
that are entirely NULL during serialisation, preventing the deserialiser
from having to process columns with ambiguous types.
* ci: fix cargo deny
* chore: downgrade `socket2`, version 0.4.5 was yanked
* chore: rename `query` to `iox_query`
`query` is already taken on crates.io and yanked and I am getting tired
of working around that.