Implements an upload() method on the ParquetStorage type, consuming a
stream of RecordBatch, serialising the Parquet file, and uploading the
result to object storage. Returns the IOx-specific file metadata.
Currently while the upload() method accepts a stream of RecordBatch, the
actual resulting Parquet file is buffered in memory before uploading to
object store, due to lack of streaming upload functionality in the
ObjectStore abstraction - this isn't the end of the world, as the files
tend to be relatively small with our current usage.
This impl should be easily modified to be fully streaming once streaming
object store puts are implemented:
https://github.com/influxdata/object_store_rs/issues/9
Construct a IoxParquetMetaData instance directly from the FileMetaData
instance returned by the ArrowWriter.
This change will allow us to avoid the inefficient impl currently in
use:
* Serialise batches into memory
* Wrap buffer in arrow cursor
* Read parquet metadata with arrow file reader
* Serialise schema with thrift
* Serialise each row group's metadata with thrift
* Construct our own FileMetaData instance
* Serialise FileMetaData with thrift
* zstd encode resulting thrift bytes
* Wrap in IoxParquetMetaData
Now we "only":
* Stream batches into opaque Write impl
* Serialise FileMetaData with thrift
* zstd encode resulting thrift bytes
* Wrap in IoxParquetMetaData
Then accessing any data within the IoxParquetMetaData (as before this
change) requires deserialising it first.
There are still a number of easy performance improvements to be had
w.r.t the metadata handling.
Implements a streaming RecordBatch to Parquet file serialiser.
This impl automatically discovers the schema of the RecordBatch stream,
and accepts &mut destination types (internalising the handle
cloning/etc) to simplify caller usage.
This encoder returns the resulting FileMetaData to allow callers to
inspect the resulting metadata without reading back the file.
Currently unused / not yet plumbed in.
* fix: ensure that query tokio background tasks are canceled
While I am not entirely sure if this explains some of the memory leaks I
am seeing in prod, not canceling the tasks correctly certainly makes
debugging way harder and also renders certain form of throttling (e.g.
max. concurrent queries) somewhat ineffective.
Note that parquet file downloads are currently NOT canceled because
tokios `spawn_blocking` cannot be canceled.
* refactor: `Vec` -> `Option`
* refactor: `spawn_blocking` creates a join handle, even though it is useless
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Changes the code paths that interact with Parquet files in the object
store to reference the ParquetStorage directly (DRY refactor).
This change takes us from a dependency graph of:
┌─────────────────┐
│ │
▼ │
Parquet Consumer │
│ ┌──────────────┐
├────────▶│ParquetStorage│
▼ └──────────────┘
┌──────────────┐
│ ObjectStore │
└──────────────┘
│
┌────┴────┐
▼ ▼
File s3
System (etc)
to:
Parquet Consumer
│
▼
┌──────────────┐
│ParquetStorage│
└──────────────┘
│
▼
┌──────────────┐
│ ObjectStore │
└──────────────┘
│
┌────┴────┐
▼ ▼
File s3
System (etc)
With the ParquetStorage being solely responsible for managing
interactions with the object store when dealing with Parquet files.
Renames the Storage type so the context is clear in usage (i.e. fn
args), rather than having to rely on knowing the fully-qualified import
path to know what the type stores.
Removes two unused constructors for a ParquetChunk, and moves the bare
fn constructor that is actually used to be an associated method (a
conventional constructor).
* refactor: require `Resource`s to be convertible to `u64`
* refactor: require `Resource`s to have a unit name
* refactor: make LRU cache IDs static
* feat: add LRU cache metrics
* docs: improve type names in LRU doctest
* docs: epxlain `MeasuredT`
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
* docs: explain `test_metrics`
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
This commit changes the protobuf record batch encoding to skip entirely
NULL columns when serialising. This prevents the deserialisation from
erroring due to a column type inference failure.
Prior to this commit, when the system was presented a record batch such
as this:
| time | A | B |
| ---------- | ---- | ---- |
| 1970-01-01 | 1 | NULL |
| 1970-07-05 | NULL | 1 |
Which would be partitioned by YMD into two separate partitions:
| time | A | B |
| ---------- | ---- | ---- |
| 1970-01-01 | 1 | NULL |
and:
| time | A | B |
| ---------- | ---- | ---- |
| 1970-07-05 | NULL | 1 |
Both partitions would contain an entirely NULL column.
Both of these partitioned record batches would be successfully encoded,
but decoding the partition fails due to the inability to infer a column
type from the serialised format which contains no values, which on the
wire, looks like:
Column {
column_name: "B",
semantic_type: Field,
values: Some(
Values {
i64_values: [],
f64_values: [],
u64_values: [],
string_values: [],
bool_values: [],
bytes_values: [],
packed_string_values: None,
interned_string_values: None,
},
),
null_mask: [
1,
],
},
In a column that is not entirely NULL, one of the "Values" fields would
be non-empty, and the decoder would use this to infer the type of the
column.
Because we have chosen to not differentiate between "NULL" and "empty"
in our proto encoding, the decoder cannot infer which field within the
"Values" struct the column belongs to - all are valid, but empty.
This commit prevents this type inference failure by skipping any columns
that are entirely NULL during serialisation, preventing the deserialiser
from having to process columns with ambiguous types.
* ci: fix cargo deny
* chore: downgrade `socket2`, version 0.4.5 was yanked
* chore: rename `query` to `iox_query`
`query` is already taken on crates.io and yanked and I am getting tired
of working around that.
Prior to this commit, it was possible to read/write to the allocated,
but unused storage bits outside of the "length" of the BitSet.
Bit access is now bounds checked.
* feat: `SortKey::size`
* feat: `FunctionEstimator`
* feat: querier RAM pool
Let's put all the caches into a single RAM pool, so we can at least
somewhat control RAM usage. Note that this does NOT limit the peak
memory during query execution though, but should at least stop unlimited
cache growth. A follow-up PR will add metrics.
* refactor: improve some size calculations
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>