Allow the MockDmlHandler to capture any input type given to the write()
method. This lets us reuse the mock across all handler implementations,
regardless of their expected write input type.
Allow a DML handler to specify the write input type on which it
operates.
This allows us to construct a write handler pipeline that transforms the
request as it passes through the various handlers. We'll use this to
implement a handler that annotates a normal set of table writes with the
partition key, modifying downstream handlers to expect this annotated
input.
* feat: allow catalog access w/o a transaction
Now the caller has the full control if they want to use a transaction or
not.
* fix: remove non-transaction-safe `create_many`
* fix: remove unnecessary transactions
* refactor: catalog Unit of Work (= transaction)
Setup an inteface to handle Units of Work within our catalog. Previously
both the Postgres and the in-mem backend used "mini-transactions on
demand". Now the caller has a clear way to establish boundaries and
gets read and write isolation. A single `Arc<dyn Catalog>` can create as
many `Box<dyn UnitOfWork>` as you like, but note that depending on the
backend you may not scale infinitely (postgres will likely impose
certain limits and the in-mem backend limits concurrency to 1 to keep
things simple).
* docs: improve wording
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
* refactor: rename Unit of Work to Transaction
* test: improve `test_txn_isolation`
* feat: clearify transaction drop semantics
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Adds a simple wrapper type that maps the namespace keyspace over a set
of N namespace schema caches, thereby reducing cache lock contention by
a factor of N (in a perfect world).
This will help smooth out latency of workloads that include new
namespace requests or incremental schema additions. It should also
significantly help latency during initial cache warming of a freshly
booted router.
This adds the scaffolding for the ingester server to consume data from Kafka. This ingests data in an in memory structure while creating records in the catalog for any partitions that don't yet exist.
I've removed catalog_update.rs in ingester for now. That was mostly a placeholder and will be going in a combination of handler.rs and data.rs on my next PR which will have some primitive lifecycle wired up.
There's one ugly bit here where the DML write is cloned because it's getting borrowed to output spans and metrics. I'll need to follow up with a refactor to make it so that the DML write's tables can be consumed without it gumming up the metrics stuff.
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Adds an in-memory cache of table schemas to the SchemaValidator DML
handler.
The cache pulls from the global catalog when observing a column for the
first time, and pushes the column type to set it for subsequent requests
if it does not exist (this pull & push is done by atomically by the
catalog in an "upsert" call).
The in-memory cache is sharded by namespace, with each shard guarded by
an individual lock to minimise contention between readers (the expected
average case) and writers (only when adding new columns/tables).
Relies on the catalog to serialise new column creation and validate
parallel creation requests.
Implements a write schema validation DML handler, denying requests that
conflict with the schema within the global catalog. Additive schema
changes are accepted, incrementally updating the global catalog schema.
Deletes are passed through unchanged and unvalidated.
Allows the DmlHandler to return different types for each method.
This enables a DmlHandler implementation decorating an inner handler to
return the inner handler's error directly, avoiding any "wrapper"
errors.
* feat: Add db_name/namespace to DmlWrite and DmlDelete
This is required for the new ingester to be able to work with the write buffer. The protobuf that gets serialized over Kafka already includes the database name, it just wasn't getting carried through to the marshaled Dml operation.
* fix: database != namespace, propagation through write buffer
Co-authored-by: Marco Neumann <marco@crepererum.net>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Initialises a ShardedWriteBuffer for the hard-coded "iox_shared" topic.
Adds the following CLI flags:
* --write-buffer: type of buffer [kafka, rskafka, file]
* --write-buffer-addr: write buffer endpoint address
The server uses these config options to initialise the appropriate write
buffer backend, and configure the TableNamespaceSharder to shard
operations over the set of sequencers exposed by the write buffer.
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* feat: Sequencer wrapper
This type wraps an underlying WriteBufferWriter implementation, tagging
it with a sequencer ID it should use when enqueuing operations to the
buffer.
* feat: mock sharder
Implements a mock Sharder impl that returns pre-configured responses to
shard(), and captures the input to the call.
* feat: sharded write buffer
Implements sharding of ops into an underlying WriteBuffer.
Writes are sharded by some abstract Sharder impl, collated per shard to
maximise the size of each op (and therefore compression efficiency),
converted into a DML operation and then enqueued in parallel to the
underlying WriteBuffer implementation.
Deletes are modelled as being mapped to a single write buffer shard,
which is the case while we support sharding based on the table &
namespace only. Deletes will be extended to support (potentially)
multiple shards when column overrides are implemented.
* refactor: runtime write buffers
Switch from using static dispatch, to using a runtime specified
WriteBufferWriting implementation.
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
This commit defines the Sharder trait that should allow us to implement
multiple sharding strategies over a defined set of input types (such as
a MutableBatch for writes, DeletePredicate for deletes, etc).
This commit also includes a jump hash implementation that consistently
shards (table name, namespace) tuples to a given shard for all input
types.
Changes the DmlHandler::delete() trait method to accept required params,
and accept a DeletePredicate instead of a HttpDeleteRequest so that it
can be re-used in the gRPC handler.
Instead of converting the set of MutableBatches into a DmlOperation to
shard into more DmlOperation instances, the sharder can operate directly
on the MutableBatches.
Defines the DmlHandler trait responsible for processing a request in
some abstract way, decoupling the HTTP/gRPC request handlers from the
underlying routing logic.