refactor(ingester): Remove `From` dml op to construct `IngestOp` directly
Removes one of the temporary conversion traits and adds a test helper method `encode_batch(NamespaceId, WriteOperation)` for removal of the `DmlOperation` from WAL replay and the RPC write handler.pull/24376/head
parent
a3a4145774
commit
3e7a82f319
|
@ -1,5 +1,5 @@
|
||||||
use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId};
|
use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId};
|
||||||
use dml::{DmlMeta, DmlOperation, DmlWrite};
|
use dml::{DmlMeta, DmlWrite};
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use mutable_batch::MutableBatch;
|
use mutable_batch::MutableBatch;
|
||||||
use trace::ctx::SpanContext;
|
use trace::ctx::SpanContext;
|
||||||
|
@ -12,17 +12,6 @@ pub enum IngestOp {
|
||||||
Write(WriteOperation),
|
Write(WriteOperation),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<DmlOperation> for IngestOp {
|
|
||||||
fn from(value: DmlOperation) -> Self {
|
|
||||||
match value {
|
|
||||||
DmlOperation::Write(w) => Self::Write(WriteOperation::from(w)),
|
|
||||||
DmlOperation::Delete(_) => {
|
|
||||||
panic!("no corresponding ingest operation exists for DML delete")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl IngestOp {
|
impl IngestOp {
|
||||||
// TODO(savage): Consider removing the getters at the top level and
|
// TODO(savage): Consider removing the getters at the top level and
|
||||||
// requiring consumers to match on the op type
|
// requiring consumers to match on the op type
|
||||||
|
@ -137,42 +126,6 @@ impl From<&WriteOperation> for DmlWrite {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(savage): Temporary `From` implementation to assist in switchover
|
|
||||||
// within ingester code. This should be removed in favour of constructing all
|
|
||||||
// [`WriteOperation`]s directly
|
|
||||||
impl From<DmlWrite> for WriteOperation {
|
|
||||||
fn from(dml_write: DmlWrite) -> Self {
|
|
||||||
let namespace_id = dml_write.namespace_id();
|
|
||||||
let partition_key = dml_write.partition_key().clone();
|
|
||||||
let sequence_number = dml_write
|
|
||||||
.meta()
|
|
||||||
.sequence()
|
|
||||||
.expect("tried to create write operation from unsequenced DML write");
|
|
||||||
let span_context = dml_write.meta().span_context().map(SpanContext::clone);
|
|
||||||
|
|
||||||
Self::new(
|
|
||||||
namespace_id,
|
|
||||||
dml_write
|
|
||||||
.into_tables()
|
|
||||||
.map(|(table, data)| {
|
|
||||||
(
|
|
||||||
table,
|
|
||||||
TableData {
|
|
||||||
table,
|
|
||||||
partitioned_data: PartitionedData {
|
|
||||||
sequence_number,
|
|
||||||
data,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.collect(),
|
|
||||||
partition_key,
|
|
||||||
span_context,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A container for all data for an individual table as part of a write
|
/// A container for all data for an individual table as part of a write
|
||||||
/// operation
|
/// operation
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId};
|
use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId};
|
||||||
use dml::{DmlMeta, DmlOperation, DmlWrite};
|
|
||||||
use generated_types::influxdata::iox::wal::v1::sequenced_wal_op::Op;
|
use generated_types::influxdata::iox::wal::v1::sequenced_wal_op::Op;
|
||||||
use metric::U64Counter;
|
use metric::U64Counter;
|
||||||
use mutable_batch_pb::decode::decode_database_batch;
|
use mutable_batch_pb::decode::decode_database_batch;
|
||||||
|
@ -9,6 +8,7 @@ use thiserror::Error;
|
||||||
use wal::{SequencedWalOp, Wal};
|
use wal::{SequencedWalOp, Wal};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
dml_payload::{IngestOp, PartitionedData, TableData, WriteOperation},
|
||||||
dml_sink::{DmlError, DmlSink},
|
dml_sink::{DmlError, DmlSink},
|
||||||
partition_iter::PartitionIter,
|
partition_iter::PartitionIter,
|
||||||
persist::{drain_buffer::persist_partitions, queue::PersistQueue},
|
persist::{drain_buffer::persist_partitions, queue::PersistQueue},
|
||||||
|
@ -212,31 +212,32 @@ where
|
||||||
|
|
||||||
debug!(?op, sequence_number = sequence_number.get(), "apply wal op");
|
debug!(?op, sequence_number = sequence_number.get(), "apply wal op");
|
||||||
|
|
||||||
// Reconstruct the DML operation
|
// Reconstruct the ingest operation
|
||||||
let batches = decode_database_batch(&op)?;
|
let batches = decode_database_batch(&op)?;
|
||||||
let namespace_id = NamespaceId::new(op.database_id);
|
let namespace_id = NamespaceId::new(op.database_id);
|
||||||
let partition_key = PartitionKey::from(op.partition_key);
|
let partition_key = PartitionKey::from(op.partition_key);
|
||||||
|
|
||||||
let op = DmlWrite::new(
|
let op = WriteOperation::new(
|
||||||
namespace_id,
|
namespace_id,
|
||||||
batches
|
batches
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(k, v)| (TableId::new(k), v))
|
.map(|(k, v)| {
|
||||||
|
let table_id = TableId::new(k);
|
||||||
|
(
|
||||||
|
table_id,
|
||||||
|
// TODO(savage): Use table-partitioned sequence
|
||||||
|
// numbers here
|
||||||
|
TableData::new(table_id, PartitionedData::new(sequence_number, v)),
|
||||||
|
)
|
||||||
|
})
|
||||||
.collect(),
|
.collect(),
|
||||||
partition_key,
|
partition_key,
|
||||||
// The tracing context should be propagated over the RPC boundary.
|
// TODO: A tracing context should be added for WAL replay.
|
||||||
DmlMeta::sequenced(
|
None,
|
||||||
sequence_number,
|
|
||||||
iox_time::Time::MAX, // TODO: remove this from DmlMeta
|
|
||||||
// TODO: A tracing context should be added for WAL replay.
|
|
||||||
None,
|
|
||||||
42, // TODO: remove this from DmlMeta
|
|
||||||
),
|
|
||||||
);
|
);
|
||||||
|
|
||||||
// Apply the operation to the provided DML sink
|
// Apply the operation to the provided DML sink
|
||||||
// TODO(savage): Construct the `IngestOp::Write` directly.
|
sink.apply(IngestOp::Write(op))
|
||||||
sink.apply(DmlOperation::Write(op).into())
|
|
||||||
.await
|
.await
|
||||||
.map_err(Into::<DmlError>::into)?;
|
.map_err(Into::<DmlError>::into)?;
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use data_types::{NamespaceId, PartitionKey, TableId};
|
use data_types::{NamespaceId, PartitionKey, TableId};
|
||||||
use dml::{DmlMeta, DmlOperation, DmlWrite};
|
|
||||||
use generated_types::influxdata::iox::ingester::v1::{
|
use generated_types::influxdata::iox::ingester::v1::{
|
||||||
self as proto, write_service_server::WriteService,
|
self as proto, write_service_server::WriteService,
|
||||||
};
|
};
|
||||||
|
@ -12,6 +11,7 @@ use thiserror::Error;
|
||||||
use tonic::{Code, Request, Response};
|
use tonic::{Code, Request, Response};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
dml_payload::{IngestOp, PartitionedData, TableData, WriteOperation},
|
||||||
dml_sink::{DmlError, DmlSink},
|
dml_sink::{DmlError, DmlSink},
|
||||||
ingest_state::{IngestState, IngestStateError},
|
ingest_state::{IngestState, IngestStateError},
|
||||||
timestamp_oracle::TimestampOracle,
|
timestamp_oracle::TimestampOracle,
|
||||||
|
@ -165,7 +165,7 @@ where
|
||||||
let namespace_id = NamespaceId::new(payload.database_id);
|
let namespace_id = NamespaceId::new(payload.database_id);
|
||||||
let partition_key = PartitionKey::from(payload.partition_key);
|
let partition_key = PartitionKey::from(payload.partition_key);
|
||||||
|
|
||||||
// Never attempt to create a DmlWrite with no tables - doing so causes a
|
// Never attempt to create a WriteOperation with no tables - doing so causes a
|
||||||
// panic.
|
// panic.
|
||||||
if num_tables == 0 {
|
if num_tables == 0 {
|
||||||
return Err(RpcError::NoTables)?;
|
return Err(RpcError::NoTables)?;
|
||||||
|
@ -179,31 +179,36 @@ where
|
||||||
"received rpc write"
|
"received rpc write"
|
||||||
);
|
);
|
||||||
|
|
||||||
// Reconstruct the DML operation
|
let sequence_number = self.timestamp.next();
|
||||||
let op = DmlWrite::new(
|
|
||||||
|
// Construct the corresponding ingester write operation for the RPC payload
|
||||||
|
let op = WriteOperation::new(
|
||||||
namespace_id,
|
namespace_id,
|
||||||
batches
|
batches
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(k, v)| (TableId::new(k), v))
|
.map(|(k, v)| {
|
||||||
|
let table_id = TableId::new(k);
|
||||||
|
(
|
||||||
|
table_id,
|
||||||
|
// TODO(savage): Sequence partitioned data independently within a
|
||||||
|
// write.
|
||||||
|
TableData::new(table_id, PartitionedData::new(sequence_number, v)),
|
||||||
|
)
|
||||||
|
})
|
||||||
.collect(),
|
.collect(),
|
||||||
partition_key,
|
partition_key,
|
||||||
DmlMeta::sequenced(
|
// TODO:
|
||||||
self.timestamp.next(),
|
// The tracing context should be propagated over the RPC boundary.
|
||||||
iox_time::Time::MAX, // TODO: remove this from DmlMeta
|
//
|
||||||
// The tracing context should be propagated over the RPC boundary.
|
// See https://github.com/influxdata/influxdb_iox/issues/6177
|
||||||
//
|
None,
|
||||||
// See https://github.com/influxdata/influxdb_iox/issues/6177
|
|
||||||
None,
|
|
||||||
42, // TODO: remove this from DmlMeta
|
|
||||||
),
|
|
||||||
);
|
);
|
||||||
|
|
||||||
// Apply the DML op to the in-memory buffer.
|
// Apply the IngestOp to the in-memory buffer.
|
||||||
// TODO(savage): Construct the `IngestOp::Write` directly
|
match self.sink.apply(IngestOp::Write(op)).await {
|
||||||
match self.sink.apply(DmlOperation::Write(op).into()).await {
|
|
||||||
Ok(()) => {}
|
Ok(()) => {}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(error=%e, "failed to apply DML op");
|
error!(error=%e, "failed to apply ingest operation");
|
||||||
return Err(e.into())?;
|
return Err(e.into())?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -292,7 +297,7 @@ mod tests {
|
||||||
sink_ret = Ok(()),
|
sink_ret = Ok(()),
|
||||||
want_err = false,
|
want_err = false,
|
||||||
want_calls = [IngestOp::Write(w)] => {
|
want_calls = [IngestOp::Write(w)] => {
|
||||||
// Assert the various DmlWrite properties match the expected values
|
// Assert the various IngestOp properties match the expected values
|
||||||
assert_eq!(w.namespace(), NAMESPACE_ID);
|
assert_eq!(w.namespace(), NAMESPACE_ID);
|
||||||
assert_eq!(w.tables().count(), 1);
|
assert_eq!(w.tables().count(), 1);
|
||||||
assert_eq!(*w.partition_key(), PartitionKey::from(PARTITION_KEY));
|
assert_eq!(*w.partition_key(), PartitionKey::from(PARTITION_KEY));
|
||||||
|
|
|
@ -108,7 +108,7 @@ impl WalAppender for Arc<wal::Wal> {
|
||||||
.map(|(table_id, data)| {
|
.map(|(table_id, data)| {
|
||||||
(
|
(
|
||||||
*table_id,
|
*table_id,
|
||||||
data.partitioned_data().sequence_number().get() as u64, // TODO(savage): Why is this signed?
|
data.partitioned_data().sequence_number().get() as u64,
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.collect::<HashMap<TableId, u64>>();
|
.collect::<HashMap<TableId, u64>>();
|
||||||
|
@ -138,11 +138,16 @@ mod tests {
|
||||||
|
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId};
|
use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId};
|
||||||
use dml::{DmlMeta, DmlOperation, DmlWrite};
|
use generated_types::influxdata::pbdata::v1::DatabaseBatch;
|
||||||
use mutable_batch_lp::lines_to_batches;
|
use mutable_batch_lp::lines_to_batches;
|
||||||
|
use mutable_batch_pb::encode::encode_batch;
|
||||||
use wal::Wal;
|
use wal::Wal;
|
||||||
|
|
||||||
use crate::{dml_sink::mock_sink::MockDmlSink, test_util::make_write_op};
|
use crate::{
|
||||||
|
dml_payload::{PartitionedData, TableData, WriteOperation},
|
||||||
|
dml_sink::mock_sink::MockDmlSink,
|
||||||
|
test_util::make_write_op,
|
||||||
|
};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
@ -165,26 +170,38 @@ mod tests {
|
||||||
0,
|
0,
|
||||||
)
|
)
|
||||||
.expect("invalid line proto");
|
.expect("invalid line proto");
|
||||||
let op = DmlWrite::new(
|
let op = WriteOperation::new(
|
||||||
NAMESPACE_ID,
|
NAMESPACE_ID,
|
||||||
[
|
[
|
||||||
(
|
(
|
||||||
TABLE_ID,
|
TABLE_ID,
|
||||||
tables_by_name
|
TableData::new(
|
||||||
.remove(TABLE_NAME)
|
TABLE_ID,
|
||||||
.expect("table does not exist in LP"),
|
PartitionedData::new(
|
||||||
|
SequenceNumber::new(42),
|
||||||
|
tables_by_name
|
||||||
|
.remove(TABLE_NAME)
|
||||||
|
.expect("table does not exist in LP"),
|
||||||
|
),
|
||||||
|
),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
SECOND_TABLE_ID,
|
SECOND_TABLE_ID,
|
||||||
tables_by_name
|
TableData::new(
|
||||||
.remove(SECOND_TABLE_NAME)
|
SECOND_TABLE_ID,
|
||||||
.expect("second table does not exist in LP"),
|
PartitionedData::new(
|
||||||
|
SequenceNumber::new(42),
|
||||||
|
tables_by_name
|
||||||
|
.remove(SECOND_TABLE_NAME)
|
||||||
|
.expect("second table does not exist in LP"),
|
||||||
|
),
|
||||||
|
),
|
||||||
),
|
),
|
||||||
]
|
]
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.collect(),
|
.collect(),
|
||||||
PartitionKey::from("p1"),
|
PartitionKey::from("p1"),
|
||||||
DmlMeta::sequenced(SequenceNumber::new(42), iox_time::Time::MIN, None, 42),
|
None,
|
||||||
);
|
);
|
||||||
|
|
||||||
// The write portion of this test.
|
// The write portion of this test.
|
||||||
|
@ -198,7 +215,7 @@ mod tests {
|
||||||
|
|
||||||
// Apply the op through the decorator
|
// Apply the op through the decorator
|
||||||
wal_sink
|
wal_sink
|
||||||
.apply(DmlOperation::Write(op.clone()).into())
|
.apply(IngestOp::Write(op.clone()))
|
||||||
.await
|
.await
|
||||||
.expect("wal should not error");
|
.expect("wal should not error");
|
||||||
|
|
||||||
|
@ -236,7 +253,7 @@ mod tests {
|
||||||
|
|
||||||
// The payload should match the serialised form of the "op" originally
|
// The payload should match the serialised form of the "op" originally
|
||||||
// wrote above.
|
// wrote above.
|
||||||
let want = encode_write(NAMESPACE_ID.get(), &op);
|
let want = encode_write_op(NAMESPACE_ID, &op);
|
||||||
|
|
||||||
assert_eq!(want, *payload);
|
assert_eq!(want, *payload);
|
||||||
}
|
}
|
||||||
|
@ -302,4 +319,18 @@ mod tests {
|
||||||
let duration = tokio::time::Instant::now().duration_since(start);
|
let duration = tokio::time::Instant::now().duration_since(start);
|
||||||
assert!(duration > DELEGATE_APPLY_TIMEOUT);
|
assert!(duration > DELEGATE_APPLY_TIMEOUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(savage): Move to a better location
|
||||||
|
fn encode_write_op(namespace_id: NamespaceId, op: &WriteOperation) -> DatabaseBatch {
|
||||||
|
DatabaseBatch {
|
||||||
|
database_id: namespace_id.get(),
|
||||||
|
partition_key: op.partition_key().to_string(),
|
||||||
|
table_batches: op
|
||||||
|
.tables()
|
||||||
|
.map(|(table_id, batch)| {
|
||||||
|
encode_batch(table_id.get(), batch.partitioned_data().data())
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue