Merge pull request #1743 from influxdata/cn/kafka-write

feat: Write entries to Kafka
pull/24376/head
kodiakhq[bot] 2021-06-24 12:50:48 +00:00 committed by GitHub
commit 6e9a735999
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 366 additions and 341 deletions

1
Cargo.lock generated
View File

@ -3781,6 +3781,7 @@ dependencies = [
"query",
"rand 0.8.3",
"rand_distr",
"rdkafka",
"read_buffer",
"serde",
"serde_json",

View File

@ -163,7 +163,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
let db = make_db().await.db;
let data = lp_lines.join("\n");
write_lp(&db, &data);
write_lp(&db, &data).await;
let scenario1 = DbScenario {
scenario_name: "Data in 4 partitions, open chunks of mutable buffer".into(),
db,
@ -171,7 +171,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
let db = make_db().await.db;
let data = lp_lines.join("\n");
write_lp(&db, &data);
write_lp(&db, &data).await;
db.rollover_partition("h2o", "2020-03-01T00").await.unwrap();
db.rollover_partition("h2o", "2020-03-02T00").await.unwrap();
let scenario2 = DbScenario {
@ -183,7 +183,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
let db = make_db().await.db;
let data = lp_lines.join("\n");
write_lp(&db, &data);
write_lp(&db, &data).await;
// roll over and load chunks into both RUB and OS
rollover_and_load(&db, "2020-03-01T00", "h2o").await;
rollover_and_load(&db, "2020-03-02T00", "h2o").await;

View File

@ -19,8 +19,8 @@ async fn setup() -> TestDb {
let db = &test_db.db;
// Chunk 0 has bar:[1-2]
write_lp(db, "cpu bar=1 10");
write_lp(db, "cpu bar=2 20");
write_lp(db, "cpu bar=1 10").await;
write_lp(db, "cpu bar=2 20").await;
let partition_key = "1970-01-01T00";
let mb_chunk = db
@ -33,9 +33,9 @@ async fn setup() -> TestDb {
.unwrap();
// Chunk 1 has bar:[3-3] (going to get pruned)
write_lp(db, "cpu bar=3 10");
write_lp(db, "cpu bar=3 100");
write_lp(db, "cpu bar=3 1000");
write_lp(db, "cpu bar=3 10").await;
write_lp(db, "cpu bar=3 100").await;
write_lp(db, "cpu bar=3 1000").await;
let partition_key = "1970-01-01T00";
let mb_chunk = db

View File

@ -57,7 +57,7 @@ impl DbSetup for NoData {
//
let db = make_db().await.db;
let data = "cpu,region=west user=23.2 100";
write_lp(&db, data);
write_lp(&db, data).await;
// move data out of open chunk
assert_eq!(
db.rollover_partition(table_name, partition_key)
@ -95,7 +95,7 @@ impl DbSetup for NoData {
//
let db = make_db().await.db;
let data = "cpu,region=west user=23.2 100";
write_lp(&db, data);
write_lp(&db, data).await;
// move data out of open chunk
assert_eq!(
db.rollover_partition(table_name, partition_key)
@ -285,7 +285,7 @@ impl DbSetup for TwoMeasurementsManyFieldsOneChunk {
"o2,state=CA temp=79.0 300",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines.join("\n")).await;
vec![DbScenario {
scenario_name: "Data in open chunk of mutable buffer".into(),
db,
@ -307,7 +307,7 @@ impl DbSetup for TwoMeasurementsManyFieldsTwoChunks {
"h2o,state=MA,city=Boston temp=70.4 50",
"h2o,state=MA,city=Boston other_temp=70.4 250",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 0)
.await
@ -318,7 +318,7 @@ impl DbSetup for TwoMeasurementsManyFieldsTwoChunks {
"o2,state=MA,city=Boston temp=53.4,reading=51 50",
"o2,state=CA temp=79.0 300",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines.join("\n")).await;
vec![DbScenario {
scenario_name: "Data in open chunk of mutable buffer and read buffer".into(),
@ -342,7 +342,7 @@ impl DbSetup for OneMeasurementTwoChunksDifferentTagSet {
"h2o,state=MA temp=70.4 50",
"h2o,state=MA other_temp=70.4 250",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 0)
.await
@ -353,7 +353,7 @@ impl DbSetup for OneMeasurementTwoChunksDifferentTagSet {
"h2o,city=Boston other_temp=72.4 350",
"h2o,city=Boston temp=53.4,reading=51 50",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 1)
.await
@ -385,7 +385,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates {
"h2o,state=MA,city=Boston max_temp=75.4 250",
"h2o,state=MA,city=Andover max_temp=69.2, 250",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 0)
.await
@ -402,7 +402,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates {
"h2o,state=CA,city=SJ min_temp=78.5,max_temp=88.0 300",
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 350",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 1)
.await
@ -419,7 +419,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates {
"h2o,state=CA,city=SJ min_temp=77.0,max_temp=90.7 450",
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=88.2 500",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 2)
.await
@ -436,7 +436,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates {
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=89.2 650",
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 700",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 3)
.await
@ -467,7 +467,8 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle {
"h2o,state=MA,city=Boston other_temp=70.4 250",
]
.join("\n"),
);
)
.await;
// Use a background task to do the work note when I used
// TaskTracker::join, it ended up hanging for reasons I don't
@ -479,7 +480,8 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle {
write_lp(
&db,
&vec!["h2o,state=CA,city=Boston other_temp=72.4 350"].join("\n"),
);
)
.await;
db.write_chunk_to_object_store("h2o", partition_key, 0)
.await
@ -536,7 +538,7 @@ impl DbSetup for EndToEndTest {
let lp_data = lp_lines.join("\n");
let db = make_db().await.db;
write_lp(&db, &lp_data);
write_lp(&db, &lp_data).await;
let scenario1 = DbScenario {
scenario_name: "Data in open chunk of mutable buffer".into(),
@ -552,7 +554,7 @@ impl DbSetup for EndToEndTest {
pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> Vec<DbScenario> {
// Scenario 1: One open chunk in MUB
let db = make_db().await.db;
write_lp(&db, data);
write_lp(&db, data).await;
let scenario1 = DbScenario {
scenario_name: "Data in open chunk of mutable buffer".into(),
db,
@ -560,7 +562,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
// Scenario 2: One closed chunk in MUB
let db = make_db().await.db;
let table_names = write_lp(&db, data);
let table_names = write_lp(&db, data).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
@ -573,7 +575,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
// Scenario 3: One closed chunk in RUB
let db = make_db().await.db;
let table_names = write_lp(&db, data);
let table_names = write_lp(&db, data).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
@ -589,7 +591,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
// Scenario 4: One closed chunk in both RUb and OS
let db = make_db().await.db;
let table_names = write_lp(&db, data);
let table_names = write_lp(&db, data).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
@ -609,7 +611,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
// Scenario 5: One closed chunk in OS only
let db = make_db().await.db;
let table_names = write_lp(&db, data);
let table_names = write_lp(&db, data).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
@ -643,8 +645,8 @@ pub async fn make_two_chunk_scenarios(
data2: &str,
) -> Vec<DbScenario> {
let db = make_db().await.db;
write_lp(&db, data1);
write_lp(&db, data2);
write_lp(&db, data1).await;
write_lp(&db, data2).await;
let scenario1 = DbScenario {
scenario_name: "Data in single open chunk of mutable buffer".into(),
db,
@ -652,13 +654,13 @@ pub async fn make_two_chunk_scenarios(
// spread across 2 mutable buffer chunks
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
let table_names = write_lp(&db, data1).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
.unwrap();
}
write_lp(&db, data2);
write_lp(&db, data2).await;
let scenario2 = DbScenario {
scenario_name: "Data in one open chunk and one closed chunk of mutable buffer".into(),
db,
@ -666,7 +668,7 @@ pub async fn make_two_chunk_scenarios(
// spread across 1 mutable buffer, 1 read buffer chunks
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
let table_names = write_lp(&db, data1).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
@ -675,7 +677,7 @@ pub async fn make_two_chunk_scenarios(
.await
.unwrap();
}
write_lp(&db, data2);
write_lp(&db, data2).await;
let scenario3 = DbScenario {
scenario_name: "Data in open chunk of mutable buffer, and one chunk of read buffer".into(),
db,
@ -683,13 +685,13 @@ pub async fn make_two_chunk_scenarios(
// in 2 read buffer chunks
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
let table_names = write_lp(&db, data1).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
.unwrap();
}
let table_names = write_lp(&db, data2);
let table_names = write_lp(&db, data2).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
@ -710,13 +712,13 @@ pub async fn make_two_chunk_scenarios(
// in 2 read buffer chunks that also loaded into object store
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
let table_names = write_lp(&db, data1).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
.unwrap();
}
let table_names = write_lp(&db, data2);
let table_names = write_lp(&db, data2).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
@ -745,13 +747,13 @@ pub async fn make_two_chunk_scenarios(
// Scenario 6: Two closed chunk in OS only
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
let table_names = write_lp(&db, data1).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
.unwrap();
}
let table_names = write_lp(&db, data2);
let table_names = write_lp(&db, data2).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
@ -807,7 +809,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
) -> Vec<DbScenario> {
// Scenario 1: One closed chunk in RUB
let db = make_db().await.db;
let table_names = write_lp(&db, data);
let table_names = write_lp(&db, data).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
@ -823,7 +825,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
// Scenario 2: One closed chunk in Parquet only
let db = make_db().await.db;
let table_names = write_lp(&db, data);
let table_names = write_lp(&db, data).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await

View File

@ -38,6 +38,7 @@ parquet_file = { path = "../parquet_file" }
query = { path = "../query" }
rand = "0.8.3"
rand_distr = "0.4.0"
rdkafka = "0.26.0"
read_buffer = { path = "../read_buffer" }
serde = "1.0"
serde_json = "1.0"

View File

@ -11,9 +11,8 @@ use query::exec::Executor;
/// This module contains code for managing the configuration of the server.
use crate::{
db::{catalog::Catalog, Db},
write_buffer::KafkaBuffer,
Error, JobRegistry, Result,
db::{catalog::Catalog, DatabaseToCommit, Db},
write_buffer, Error, JobRegistry, Result,
};
use observability_deps::tracing::{self, error, info, warn, Instrument};
use tokio::task::JoinHandle;
@ -231,42 +230,16 @@ impl Config {
}
/// Creates database in initialized state.
fn commit_db(
&self,
rules: DatabaseRules,
server_id: ServerId,
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
preserved_catalog: PreservedCatalog,
catalog: Catalog,
) {
fn commit_db(&self, database_to_commit: DatabaseToCommit) {
let mut state = self.state.write().expect("mutex poisoned");
let name = rules.name.clone();
let name = database_to_commit.rules.name.clone();
if self.shutdown.is_cancelled() {
error!("server is shutting down");
return;
}
// Right now, `KafkaBuffer` is the only production implementation of the `WriteBuffer`
// trait, so always use `KafkaBuffer` when there is a write buffer connection string
// specified. If/when there are other kinds of write buffers, additional configuration will
// be needed to determine what kind of write buffer to use here.
let write_buffer = rules
.write_buffer_connection_string
.as_ref()
.map(|conn| Arc::new(KafkaBuffer::new(conn)) as _);
let db = Arc::new(Db::new(
rules,
server_id,
object_store,
exec,
Arc::clone(&self.jobs),
preserved_catalog,
catalog,
write_buffer,
));
let db = Arc::new(Db::new(database_to_commit, Arc::clone(&self.jobs)));
let shutdown = self.shutdown.child_token();
let shutdown_captured = shutdown.clone();
@ -447,32 +420,17 @@ impl<'a> CreateDatabaseHandle<'a> {
///
/// Will fail if database name used to create this handle and the name within `rules` do not match. In this case,
/// the database will be de-registered.
pub(crate) fn commit_db(
mut self,
server_id: ServerId,
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
preserved_catalog: PreservedCatalog,
catalog: Catalog,
rules: DatabaseRules,
) -> Result<()> {
pub(crate) fn commit_db(mut self, database_to_commit: DatabaseToCommit) -> Result<()> {
let db_name = self.db_name.take().expect("not committed");
if db_name != rules.name {
if db_name != database_to_commit.rules.name {
self.config.forget_reservation(&db_name);
return Err(Error::RulesDatabaseNameMismatch {
actual: rules.name.to_string(),
actual: database_to_commit.rules.name.to_string(),
expected: db_name.to_string(),
});
}
self.config.commit_db(
rules,
server_id,
object_store,
exec,
preserved_catalog,
catalog,
);
self.config.commit_db(database_to_commit);
Ok(())
}
@ -557,14 +515,20 @@ impl<'a> RecoverDatabaseHandle<'a> {
});
}
self.config.commit_db(
rules,
let write_buffer = write_buffer::new(&rules)
.map_err(|e| Error::CreatingWriteBufferForWriting { source: e })?;
let database_to_commit = DatabaseToCommit {
server_id,
object_store,
exec,
preserved_catalog,
catalog,
);
rules,
write_buffer,
};
self.config.commit_db(database_to_commit);
Ok(())
}
@ -652,16 +616,17 @@ mod test {
{
let db_reservation = config.create_db(DatabaseName::new("bar").unwrap()).unwrap();
let err = db_reservation
.commit_db(
server_id,
Arc::clone(&store),
Arc::clone(&exec),
preserved_catalog,
catalog,
rules.clone(),
)
.unwrap_err();
let database_to_commit = DatabaseToCommit {
server_id,
object_store: Arc::clone(&store),
exec: Arc::clone(&exec),
preserved_catalog,
catalog,
rules: rules.clone(),
write_buffer: None,
};
let err = db_reservation.commit_db(database_to_commit).unwrap_err();
assert!(matches!(err, Error::RulesDatabaseNameMismatch { .. }));
}
@ -675,9 +640,16 @@ mod test {
.await
.unwrap();
let db_reservation = config.create_db(name.clone()).unwrap();
db_reservation
.commit_db(server_id, store, exec, preserved_catalog, catalog, rules)
.unwrap();
let database_to_commit = DatabaseToCommit {
server_id,
object_store: store,
exec,
preserved_catalog,
catalog,
rules,
write_buffer: None,
};
db_reservation.commit_db(database_to_commit).unwrap();
assert!(config.db(&name).is_some());
assert_eq!(config.db_names_sorted(), vec![name.clone()]);
@ -856,9 +828,18 @@ mod test {
)
.await
.unwrap();
db_reservation
.commit_db(server_id, store, exec, preserved_catalog, catalog, rules)
.unwrap();
let database_to_commit = DatabaseToCommit {
server_id,
object_store: store,
exec,
preserved_catalog,
catalog,
rules,
write_buffer: None,
};
db_reservation.commit_db(database_to_commit).unwrap();
let token = config
.state

View File

@ -360,26 +360,28 @@ pub async fn load_or_create_preserved_catalog(
}
}
impl Db {
#[allow(clippy::clippy::too_many_arguments)]
pub fn new(
rules: DatabaseRules,
server_id: ServerId,
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
jobs: Arc<JobRegistry>,
preserved_catalog: PreservedCatalog,
catalog: Catalog,
write_buffer: Option<Arc<dyn WriteBuffer>>,
) -> Self {
let db_name = rules.name.clone();
/// All the information needed to commit a database
#[derive(Debug)]
pub(crate) struct DatabaseToCommit {
pub(crate) server_id: ServerId,
pub(crate) object_store: Arc<ObjectStore>,
pub(crate) exec: Arc<Executor>,
pub(crate) preserved_catalog: PreservedCatalog,
pub(crate) catalog: Catalog,
pub(crate) rules: DatabaseRules,
pub(crate) write_buffer: Option<Arc<dyn WriteBuffer>>,
}
let rules = RwLock::new(rules);
let server_id = server_id;
let store = Arc::clone(&object_store);
let metrics_registry = Arc::clone(&catalog.metrics_registry);
let metric_labels = catalog.metric_labels.clone();
let catalog = Arc::new(catalog);
impl Db {
pub(crate) fn new(database_to_commit: DatabaseToCommit, jobs: Arc<JobRegistry>) -> Self {
let db_name = database_to_commit.rules.name.clone();
let rules = RwLock::new(database_to_commit.rules);
let server_id = database_to_commit.server_id;
let store = Arc::clone(&database_to_commit.object_store);
let metrics_registry = Arc::clone(&database_to_commit.catalog.metrics_registry);
let metric_labels = database_to_commit.catalog.metric_labels.clone();
let catalog = Arc::new(database_to_commit.catalog);
let catalog_access = QueryCatalogAccess::new(
&db_name,
@ -396,8 +398,8 @@ impl Db {
rules,
server_id,
store,
exec,
preserved_catalog: Arc::new(preserved_catalog),
exec: database_to_commit.exec,
preserved_catalog: Arc::new(database_to_commit.preserved_catalog),
catalog,
jobs,
metrics_registry,
@ -406,7 +408,7 @@ impl Db {
worker_iterations_lifecycle: AtomicUsize::new(0),
worker_iterations_cleanup: AtomicUsize::new(0),
metric_labels,
write_buffer,
write_buffer: database_to_commit.write_buffer,
}
}
@ -862,7 +864,7 @@ impl Db {
}
/// Stores an entry based on the configuration.
pub fn store_entry(&self, entry: Entry) -> Result<()> {
pub async fn store_entry(&self, entry: Entry) -> Result<()> {
let immutable = {
let rules = self.rules.read();
rules.lifecycle_rules.immutable
@ -873,13 +875,19 @@ impl Db {
// If only the write buffer is configured, this is passing the data through to
// the write buffer, and it's not an error. We ignore the returned metadata; it
// will get picked up when data is read from the write buffer.
let _ = write_buffer.store_entry(&entry).context(WriteBufferError)?;
let _ = write_buffer
.store_entry(&entry)
.await
.context(WriteBufferError)?;
Ok(())
}
(Some(write_buffer), false) => {
// If using both write buffer and mutable buffer, we want to wait for the write
// buffer to return success before adding the entry to the mutable buffer.
let sequence = write_buffer.store_entry(&entry).context(WriteBufferError)?;
let sequence = write_buffer
.store_entry(&entry)
.await
.context(WriteBufferError)?;
let sequenced_entry = Arc::new(
SequencedEntry::new_from_sequence(sequence, entry)
.context(SequencedEntryError)?,
@ -1169,32 +1177,29 @@ pub mod test_helpers {
use std::collections::HashSet;
/// Try to write lineprotocol data and return all tables that where written.
pub fn try_write_lp(db: &Db, lp: &str) -> Result<Vec<String>> {
pub async fn try_write_lp(db: &Db, lp: &str) -> Result<Vec<String>> {
let entries = lp_to_entries(lp);
let mut tables = HashSet::new();
for entry in &entries {
for entry in entries {
if let Some(writes) = entry.partition_writes() {
for write in writes {
for batch in write.table_batches() {
tables.insert(batch.name().to_string());
}
}
db.store_entry(entry).await?;
}
}
entries
.into_iter()
.try_for_each(|entry| db.store_entry(entry))?;
let mut tables: Vec<_> = tables.into_iter().collect();
tables.sort();
Ok(tables)
}
/// Same was [`try_write_lp`](try_write_lp) but will panic on failure.
pub fn write_lp(db: &Db, lp: &str) -> Vec<String> {
try_write_lp(db, lp).unwrap()
pub async fn write_lp(db: &Db, lp: &str) -> Vec<String> {
try_write_lp(db, lp).await.unwrap()
}
}
@ -1251,7 +1256,7 @@ mod tests {
let db = make_db().await.db;
db.rules.write().lifecycle_rules.immutable = true;
let entry = lp_to_entry("cpu bar=1 10");
let res = db.store_entry(entry);
let res = db.store_entry(entry).await;
assert_contains!(
res.unwrap_err().to_string(),
"Cannot write to this database: no mutable buffer configured"
@ -1272,7 +1277,7 @@ mod tests {
test_db.rules.write().lifecycle_rules.immutable = true;
let entry = lp_to_entry("cpu bar=1 10");
test_db.store_entry(entry).unwrap();
test_db.store_entry(entry).await.unwrap();
assert_eq!(write_buffer.entries.lock().unwrap().len(), 1);
}
@ -1289,7 +1294,7 @@ mod tests {
.db;
let entry = lp_to_entry("cpu bar=1 10");
test_db.store_entry(entry).unwrap();
test_db.store_entry(entry).await.unwrap();
assert_eq!(write_buffer.entries.lock().unwrap().len(), 1);
@ -1310,7 +1315,7 @@ mod tests {
async fn read_write() {
// This test also exercises the path without a write buffer.
let db = Arc::new(make_db().await.db);
write_lp(&db, "cpu bar=1 10");
write_lp(&db, "cpu bar=1 10").await;
let batches = run_query(db, "select * from cpu").await;
@ -1344,7 +1349,7 @@ mod tests {
let test_db = make_db().await;
let db = Arc::new(test_db.db);
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=1 10").await;
// A chunk has been opened
test_db
@ -1363,7 +1368,7 @@ mod tests {
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 44).unwrap();
// write into same chunk again.
write_lp(db.as_ref(), "cpu bar=2 10");
write_lp(db.as_ref(), "cpu bar=2 10").await;
// verify chunk size updated
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 60).unwrap();
@ -1472,7 +1477,7 @@ mod tests {
#[tokio::test]
async fn write_with_rollover() {
let db = Arc::new(make_db().await.db);
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=1 10").await;
assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap());
let mb_chunk = db
@ -1493,7 +1498,7 @@ mod tests {
assert_batches_sorted_eq!(expected, &batches);
// add new data
write_lp(db.as_ref(), "cpu bar=2 20");
write_lp(db.as_ref(), "cpu bar=2 20").await;
let expected = vec![
"+-----+-------------------------------+",
"| bar | time |",
@ -1530,7 +1535,7 @@ mod tests {
"cpu,core=one user=10.0 11",
];
write_lp(db.as_ref(), &lines.join("\n"));
write_lp(db.as_ref(), &lines.join("\n")).await;
assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap());
let mb_chunk = db
@ -1559,8 +1564,8 @@ mod tests {
let test_db = make_db().await;
let db = Arc::new(test_db.db);
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=2 20");
write_lp(db.as_ref(), "cpu bar=1 10").await;
write_lp(db.as_ref(), "cpu bar=2 20").await;
let partition_key = "1970-01-01T00";
let mb_chunk = db
@ -1645,12 +1650,12 @@ mod tests {
let test_db = make_db().await;
let db = Arc::new(test_db.db);
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10");
write_lp(db.as_ref(), "cpu,tag1=asfd,tag2=foo bar=2 20");
write_lp(db.as_ref(), "cpu,tag1=bingo,tag2=foo bar=2 10");
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 20");
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 10");
write_lp(db.as_ref(), "cpu,tag2=a bar=3 5");
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await;
write_lp(db.as_ref(), "cpu,tag1=asfd,tag2=foo bar=2 20").await;
write_lp(db.as_ref(), "cpu,tag1=bingo,tag2=foo bar=2 10").await;
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 20").await;
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 10").await;
write_lp(db.as_ref(), "cpu,tag2=a bar=3 5").await;
let partition_key = "1970-01-01T00";
let mb_chunk = db
@ -1760,8 +1765,8 @@ mod tests {
let db = Arc::new(test_db.db);
// Write some line protocols in Mutable buffer of the DB
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=2 20");
write_lp(db.as_ref(), "cpu bar=1 10").await;
write_lp(db.as_ref(), "cpu bar=2 20").await;
//Now mark the MB chunk close
let partition_key = "1970-01-01T00";
@ -1859,8 +1864,8 @@ mod tests {
let db = Arc::new(test_db.db);
// Write some line protocols in Mutable buffer of the DB
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=2 20");
write_lp(db.as_ref(), "cpu bar=1 10").await;
write_lp(db.as_ref(), "cpu bar=2 20").await;
// Now mark the MB chunk close
let partition_key = "1970-01-01T00";
@ -1970,7 +1975,7 @@ mod tests {
let before_create = Utc::now();
let partition_key = "1970-01-01T00";
write_lp(&db, "cpu bar=1 10");
write_lp(&db, "cpu bar=1 10").await;
let after_write = Utc::now();
let last_write_prev = {
@ -1983,7 +1988,7 @@ mod tests {
partition.last_write_at()
};
write_lp(&db, "cpu bar=1 20");
write_lp(&db, "cpu bar=1 20").await;
{
let partition = db.catalog.partition("cpu", partition_key).unwrap();
let partition = partition.read();
@ -1997,7 +2002,7 @@ mod tests {
let db = Arc::new(make_db().await.db);
// Given data loaded into two chunks
write_lp(&db, "cpu bar=1 10");
write_lp(&db, "cpu bar=1 10").await;
let after_data_load = Utc::now();
// When the chunk is rolled over
@ -2035,8 +2040,8 @@ mod tests {
db.rules.write().lifecycle_rules.mutable_size_threshold =
Some(NonZeroUsize::new(2).unwrap());
write_lp(&db, "cpu bar=1 10");
write_lp(&db, "cpu bar=1 20");
write_lp(&db, "cpu bar=1 10").await;
write_lp(&db, "cpu bar=1 20").await;
let partitions = db.catalog.partition_keys();
assert_eq!(partitions.len(), 1);
@ -2066,10 +2071,10 @@ mod tests {
#[tokio::test]
async fn chunks_sorted_by_times() {
let db = Arc::new(make_db().await.db);
write_lp(&db, "cpu val=1 1");
write_lp(&db, "mem val=2 400000000000001");
write_lp(&db, "cpu val=1 2");
write_lp(&db, "mem val=2 400000000000002");
write_lp(&db, "cpu val=1 1").await;
write_lp(&db, "mem val=2 400000000000001").await;
write_lp(&db, "cpu val=1 2").await;
write_lp(&db, "mem val=2 400000000000002").await;
let sort_rules = SortOrder {
order: Order::Desc,
@ -2101,8 +2106,8 @@ mod tests {
let db = Arc::new(make_db().await.db);
let partition_key = "1970-01-01T00";
write_lp(&db, "cpu bar=1 10");
write_lp(&db, "cpu bar=1 20");
write_lp(&db, "cpu bar=1 10").await;
write_lp(&db, "cpu bar=1 20").await;
assert_eq!(mutable_chunk_ids(&db, partition_key), vec![0]);
assert_eq!(
@ -2120,7 +2125,7 @@ mod tests {
// add a new chunk in mutable buffer, and move chunk1 (but
// not chunk 0) to read buffer
write_lp(&db, "cpu bar=1 30");
write_lp(&db, "cpu bar=1 30").await;
let mb_chunk = db
.rollover_partition("cpu", "1970-01-01T00")
.await
@ -2130,7 +2135,7 @@ mod tests {
.await
.unwrap();
write_lp(&db, "cpu bar=1 40");
write_lp(&db, "cpu bar=1 40").await;
assert_eq!(mutable_chunk_ids(&db, partition_key), vec![0, 2]);
assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![1]);
@ -2169,11 +2174,11 @@ mod tests {
// Test that chunk id listing is hooked up
let db = Arc::new(make_db().await.db);
write_lp(&db, "cpu bar=1 1");
write_lp(&db, "cpu bar=1 1").await;
db.rollover_partition("cpu", "1970-01-01T00").await.unwrap();
// write into a separate partitiion
write_lp(&db, "cpu bar=1,baz2,frob=3 400000000000000");
write_lp(&db, "cpu bar=1,baz2,frob=3 400000000000000").await;
print!("Partitions: {:?}", db.partition_keys().unwrap());
@ -2212,9 +2217,9 @@ mod tests {
async fn partition_chunk_summaries_timestamp() {
let db = Arc::new(make_db().await.db);
let start = Utc::now();
write_lp(&db, "cpu bar=1 1");
write_lp(&db, "cpu bar=1 1").await;
let after_first_write = Utc::now();
write_lp(&db, "cpu bar=2 2");
write_lp(&db, "cpu bar=2 2").await;
db.rollover_partition("cpu", "1970-01-01T00").await.unwrap();
let after_close = Utc::now();
@ -2264,11 +2269,11 @@ mod tests {
let db = Arc::new(make_db().await.db);
// get three chunks: one open, one closed in mb and one close in rb
write_lp(&db, "cpu bar=1 1");
write_lp(&db, "cpu bar=1 1").await;
db.rollover_partition("cpu", "1970-01-01T00").await.unwrap();
write_lp(&db, "cpu bar=1,baz=2 2");
write_lp(&db, "cpu bar=1,baz=2,frob=3 400000000000000");
write_lp(&db, "cpu bar=1,baz=2 2").await;
write_lp(&db, "cpu bar=1,baz=2,frob=3 400000000000000").await;
print!("Partitions: {:?}", db.partition_keys().unwrap());
@ -2283,7 +2288,7 @@ mod tests {
print!("Partitions2: {:?}", db.partition_keys().unwrap());
db.rollover_partition("cpu", "1970-01-05T15").await.unwrap();
write_lp(&db, "cpu bar=1,baz=3,blargh=3 400000000000000");
write_lp(&db, "cpu bar=1,baz=3,blargh=3 400000000000000").await;
let chunk_summaries = db.chunk_summaries().expect("expected summary to return");
let chunk_summaries = normalize_summaries(chunk_summaries);
@ -2345,15 +2350,15 @@ mod tests {
// Test that chunk id listing is hooked up
let db = Arc::new(make_db().await.db);
write_lp(&db, "cpu bar=1 1");
write_lp(&db, "cpu bar=1 1").await;
let chunk_id = db
.rollover_partition("cpu", "1970-01-01T00")
.await
.unwrap()
.unwrap()
.id();
write_lp(&db, "cpu bar=2,baz=3.0 2");
write_lp(&db, "mem foo=1 1");
write_lp(&db, "cpu bar=2,baz=3.0 2").await;
write_lp(&db, "mem foo=1 1").await;
// load a chunk to the read buffer
db.load_chunk_to_read_buffer("cpu", "1970-01-01T00", chunk_id)
@ -2366,8 +2371,8 @@ mod tests {
.unwrap();
// write into a separate partition
write_lp(&db, "cpu bar=1 400000000000000");
write_lp(&db, "mem frob=3 400000000000001");
write_lp(&db, "cpu bar=1 400000000000000").await;
write_lp(&db, "mem frob=3 400000000000001").await;
print!("Partitions: {:?}", db.partition_keys().unwrap());
@ -2532,8 +2537,8 @@ mod tests {
let db = Arc::new(make_db().await.db);
// create MB partition
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=2 20");
write_lp(db.as_ref(), "cpu bar=1 10").await;
write_lp(db.as_ref(), "cpu bar=2 20").await;
// MB => RB
let partition_key = "1970-01-01T00";
@ -2566,11 +2571,11 @@ mod tests {
db.rules.write().lifecycle_rules.buffer_size_hard = Some(NonZeroUsize::new(10).unwrap());
// inserting first line does not trigger hard buffer limit
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=1 10").await;
// but second line will
assert!(matches!(
try_write_lp(db.as_ref(), "cpu bar=2 20"),
try_write_lp(db.as_ref(), "cpu bar=2 20").await,
Err(super::Error::HardLimitReached {})
));
}
@ -2593,7 +2598,7 @@ mod tests {
let db = Arc::new(test_db.db);
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=1 10").await;
test_db
.metric_registry
@ -2829,7 +2834,7 @@ mod tests {
}
// ==================== check: DB still writable ====================
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=1 10").await;
}
#[tokio::test]
@ -2996,7 +3001,7 @@ mod tests {
}
// ==================== check: DB still writable ====================
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=1 10").await;
}
#[tokio::test]
@ -3015,7 +3020,7 @@ mod tests {
}
async fn create_parquet_chunk(db: &Db) -> (String, String, u32) {
write_lp(db, "cpu bar=1 10");
write_lp(db, "cpu bar=1 10").await;
let partition_key = "1970-01-01T00";
let table_name = "cpu";

View File

@ -23,8 +23,8 @@ use tokio::sync::Semaphore;
use crate::{
config::{Config, DB_RULES_FILE_NAME},
db::load_or_create_preserved_catalog,
DatabaseError,
db::{load_or_create_preserved_catalog, DatabaseToCommit},
write_buffer, DatabaseError,
};
const STORE_ERROR_PAUSE_SECONDS: u64 = 100;
@ -71,6 +71,9 @@ pub enum Error {
DatabaseNameError {
source: data_types::DatabaseNameError,
},
#[snafu(display("Cannot create write buffer for writing: {}", source))]
CreateWriteBufferForWriting { source: DatabaseError },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -326,9 +329,22 @@ impl InitStatus {
.context(CatalogLoadError)
{
Ok((preserved_catalog, catalog)) => {
let write_buffer =
write_buffer::new(&rules).context(CreateWriteBufferForWriting)?;
let database_to_commit = DatabaseToCommit {
server_id,
object_store: store,
exec,
preserved_catalog,
catalog,
rules,
write_buffer,
};
// everything is there, can create DB
handle
.commit_db(server_id, store, exec, preserved_catalog, catalog, rules)
.commit_db(database_to_commit)
.map_err(Box::new)
.context(CreateDbError)?;
Ok(())

View File

@ -73,7 +73,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::BytesMut;
use db::load_or_create_preserved_catalog;
use db::{load_or_create_preserved_catalog, DatabaseToCommit};
use init::InitStatus;
use observability_deps::tracing::{debug, info, warn};
use parking_lot::Mutex;
@ -207,6 +207,9 @@ pub enum Error {
#[snafu(display("cannot get id: {}", source))]
GetIdError { source: crate::init::Error },
#[snafu(display("cannot create write buffer for writing: {}", source))]
CreatingWriteBufferForWriting { source: DatabaseError },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -498,14 +501,20 @@ where
.map_err(|e| Box::new(e) as _)
.context(CatalogLoadError)?;
db_reservation.commit_db(
let write_buffer = write_buffer::new(&rules)
.map_err(|e| Error::CreatingWriteBufferForWriting { source: e })?;
let database_to_commit = DatabaseToCommit {
server_id,
Arc::clone(&self.store),
Arc::clone(&self.exec),
object_store: Arc::clone(&self.store),
exec: Arc::clone(&self.exec),
preserved_catalog,
catalog,
rules,
)?;
write_buffer,
};
db_reservation.commit_db(database_to_commit)?;
Ok(())
}
@ -714,7 +723,7 @@ where
pub async fn write_entry_local(&self, db_name: &str, db: &Db, entry: Entry) -> Result<()> {
let bytes = entry.data().len() as u64;
db.store_entry(entry).map_err(|e| {
db.store_entry(entry).await.map_err(|e| {
self.metrics.ingest_entries_bytes_total.add_with_labels(
bytes,
&[

View File

@ -8,7 +8,7 @@ use object_store::{memory::InMemory, ObjectStore};
use query::{exec::Executor, QueryDatabase};
use crate::{
db::{load_or_create_preserved_catalog, Db},
db::{load_or_create_preserved_catalog, DatabaseToCommit, Db},
write_buffer::WriteBuffer,
JobRegistry,
};
@ -78,18 +78,19 @@ impl TestDbBuilder {
rules.lifecycle_rules.catalog_transactions_until_checkpoint =
self.catalog_transactions_until_checkpoint;
let database_to_commit = DatabaseToCommit {
rules,
server_id,
object_store,
preserved_catalog,
catalog,
write_buffer: self.write_buffer,
exec,
};
TestDb {
metric_registry: metrics::TestMetricRegistry::new(metrics_registry),
db: Db::new(
rules,
server_id,
object_store,
exec,
Arc::new(JobRegistry::new()),
preserved_catalog,
catalog,
self.write_buffer,
),
db: Db::new(database_to_commit, Arc::new(JobRegistry::new())),
}
}

View File

@ -1,36 +1,107 @@
use async_trait::async_trait;
use data_types::database_rules::DatabaseRules;
use entry::{Entry, Sequence};
use rdkafka::{
error::KafkaError,
producer::{FutureProducer, FutureRecord},
ClientConfig,
};
use std::{convert::TryInto, sync::Arc};
pub type WriteBufferError = Box<dyn std::error::Error + Sync + Send>;
pub fn new(rules: &DatabaseRules) -> Result<Option<Arc<dyn WriteBuffer>>, WriteBufferError> {
let name = rules.db_name();
// Right now, `KafkaBuffer` is the only production implementation of the `WriteBuffer`
// trait, so always use `KafkaBuffer` when there is a write buffer connection string
// specified. If/when there are other kinds of write buffers, additional configuration will
// be needed to determine what kind of write buffer to use here.
match rules.write_buffer_connection_string.as_ref() {
Some(conn) => {
let kafka_buffer = KafkaBuffer::new(conn, name)?;
Ok(Some(Arc::new(kafka_buffer) as _))
}
None => Ok(None),
}
}
/// A Write Buffer takes an `Entry` and returns `Sequence` data that facilitates reading entries
/// from the Write Buffer at a later time.
#[async_trait]
pub trait WriteBuffer: Sync + Send + std::fmt::Debug + 'static {
/// Send an `Entry` to the write buffer and return information that can be used to restore
/// entries at a later time.
fn store_entry(
&self,
entry: &Entry,
) -> Result<Sequence, Box<dyn std::error::Error + Sync + Send>>;
async fn store_entry(&self, entry: &Entry) -> Result<Sequence, WriteBufferError>;
// TODO: interface for restoring, will look something like:
// fn restore_from(&self, sequence: &Sequence) -> Result<Stream<Entry>, Err>;
// async fn restore_from(&self, sequence: &Sequence) -> Result<Stream<Entry>, Err>;
}
#[derive(Debug)]
pub struct KafkaBuffer {
conn: String,
database_name: String,
producer: FutureProducer,
}
// Needed because rdkafka's FutureProducer doesn't impl Debug
impl std::fmt::Debug for KafkaBuffer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KafkaBuffer")
.field("conn", &self.conn)
.field("database_name", &self.database_name)
.finish()
}
}
#[async_trait]
impl WriteBuffer for KafkaBuffer {
fn store_entry(
&self,
_entry: &Entry,
) -> Result<Sequence, Box<dyn std::error::Error + Sync + Send>> {
unimplemented!()
/// Send an `Entry` to Kafka and return the partition ID as the sequencer ID and the offset
/// as the sequence number.
async fn store_entry(&self, entry: &Entry) -> Result<Sequence, WriteBufferError> {
// This type annotation is necessary because `FutureRecord` is generic over key type, but
// key is optional and we're not setting a key. `String` is arbitrary.
let record: FutureRecord<'_, String, _> =
FutureRecord::to(&self.database_name).payload(entry.data());
// Can't use `?` here because `send_result` returns `Err((E: Error, original_msg))` so we
// have to extract the actual error out with a `match`.
let (partition, offset) = match self.producer.send_result(record) {
// Same error structure on the result of the future, need to `match`
Ok(delivery_future) => match delivery_future.await? {
Ok((partition, offset)) => (partition, offset),
Err((e, _returned_record)) => return Err(Box::new(e)),
},
Err((e, _returned_record)) => return Err(Box::new(e)),
};
Ok(Sequence {
id: partition.try_into()?,
number: offset.try_into()?,
})
}
}
impl KafkaBuffer {
pub fn new(conn: impl Into<String>) -> Self {
Self { conn: conn.into() }
pub fn new(
conn: impl Into<String>,
database_name: impl Into<String>,
) -> Result<Self, KafkaError> {
let conn = conn.into();
let database_name = database_name.into();
let mut cfg = ClientConfig::new();
cfg.set("bootstrap.servers", &conn);
cfg.set("message.timeout.ms", "5000");
let producer: FutureProducer = cfg.create()?;
Ok(Self {
conn,
database_name,
producer,
})
}
}
@ -43,11 +114,9 @@ pub mod test_helpers {
pub entries: Arc<Mutex<Vec<Entry>>>,
}
#[async_trait]
impl WriteBuffer for MockBuffer {
fn store_entry(
&self,
entry: &Entry,
) -> Result<Sequence, Box<dyn std::error::Error + Sync + Send>> {
async fn store_entry(&self, entry: &Entry) -> Result<Sequence, WriteBufferError> {
let mut entries = self.entries.lock().unwrap();
let offset = entries.len() as u64;
entries.push(entry.clone());

View File

@ -73,7 +73,7 @@ async fn setup(object_store: Arc<ObjectStore>, done: &Mutex<bool>) {
let partition_key = "1970-01-01T00";
for chunk_id in 0..N_CHUNKS {
let table_names = write_lp(&db, &lp);
let table_names = write_lp(&db, &lp).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)

View File

@ -1,16 +1,13 @@
use futures::{stream::FuturesUnordered, StreamExt};
use crate::{
common::server_fixture::ServerFixture,
end_to_end_cases::scenario::{create_readable_database_plus, rand_name},
};
use entry::Entry;
use rdkafka::{
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
client::DefaultClientContext,
consumer::{Consumer, StreamConsumer},
producer::{FutureProducer, FutureRecord},
ClientConfig, Message, Offset, TopicPartitionList,
};
use std::{
array,
convert::TryInto,
time::{SystemTime, UNIX_EPOCH},
};
use std::convert::TryFrom;
/// If `TEST_INTEGRATION` and `KAFKA_CONNECT` are set, return the Kafka connection URL to the
/// caller.
@ -56,111 +53,54 @@ macro_rules! maybe_skip_integration {
}};
}
// This is the test I actually want to write but can't yet because the code doesn't exist
// #[tokio::test]
// async fn writes_go_to_kafka() {
// // start up kafka
//
// // set up a database with a write buffer pointing at kafka
//
// // write some points
//
// // check the data is in kafka
//
// // stop kafka
// }
// This test validates the Kafka/Docker Compose setup and that the Rust tests can use it
#[tokio::test]
async fn can_connect_to_kafka() {
// TODO: this should be the database name and managed by IOx
const TOPIC: &str = "my-topic22227";
// TODO: this should go away
const NUM_MSGS: usize = 10;
async fn writes_go_to_kafka() {
let kafka_connection = maybe_skip_integration!();
// connect to kafka, produce, and consume
// set up a database with a write buffer pointing at kafka
let server = ServerFixture::create_shared().await;
let db_name = rand_name();
let write_buffer_connection_string = kafka_connection.to_string();
create_readable_database_plus(&db_name, server.grpc_channel(), |mut rules| {
rules.write_buffer_connection_string = write_buffer_connection_string;
rules
})
.await;
// write some points
let mut write_client = server.write_client();
let lp_lines = [
"cpu,region=west user=23.2 100",
"cpu,region=west user=21.0 150",
"disk,region=east bytes=99i 200",
];
let num_lines_written = write_client
.write(&db_name, lp_lines.join("\n"))
.await
.expect("cannot write");
assert_eq!(num_lines_written, 3);
// check the data is in kafka
let mut cfg = ClientConfig::new();
cfg.set("bootstrap.servers", kafka_connection);
cfg.set("session.timeout.ms", "6000");
cfg.set("enable.auto.commit", "false");
cfg.set("group.id", "placeholder");
let admin_cfg = cfg.clone();
let mut producer_cfg = cfg.clone();
producer_cfg.set("message.timeout.ms", "5000");
let mut consumer_cfg = cfg.clone();
consumer_cfg.set("session.timeout.ms", "6000");
consumer_cfg.set("enable.auto.commit", "false");
consumer_cfg.set("group.id", "placeholder");
let admin: AdminClient<DefaultClientContext> = admin_cfg.create().unwrap();
let producer: FutureProducer = producer_cfg.create().unwrap();
let consumer: StreamConsumer = consumer_cfg.create().unwrap();
let topic = NewTopic::new(TOPIC, 1, TopicReplication::Fixed(1));
let opts = AdminOptions::default();
admin.create_topics(&[topic], &opts).await.unwrap();
let consumer: StreamConsumer = cfg.create().unwrap();
let mut topics = TopicPartitionList::new();
topics.add_partition(TOPIC, 0);
topics.add_partition(&db_name, 0);
topics
.set_partition_offset(TOPIC, 0, Offset::Beginning)
.set_partition_offset(&db_name, 0, Offset::Beginning)
.unwrap();
consumer.assign(&topics).unwrap();
let consumer_task = tokio::spawn(async move {
eprintln!("Consumer task starting");
let message = consumer.recv().await.unwrap();
assert_eq!(message.topic(), db_name);
let mut counter = NUM_MSGS;
loop {
let p = consumer.recv().await.unwrap();
eprintln!("Received a {:?}", p.payload().map(String::from_utf8_lossy));
counter -= 1;
if counter == 0 {
break;
}
}
assert_eq!(counter, 0);
eprintln!("Exiting Consumer");
});
// TODO all the producing should move to server/src/write_buffer.rs
let producer_task = tokio::spawn(async move {
eprintln!("Producer task starting");
for i in 0..NUM_MSGS {
let s = format!("hello! {}", i);
let record = FutureRecord::to(TOPIC).key(&s).payload(&s).timestamp(now());
match producer.send_result(record) {
Ok(x) => match x.await.unwrap() {
Ok((partition, offset)) => {
// TODO remove all the dbg
dbg!(&s, partition, offset);
}
Err((e, msg)) => panic!("oh no {}, {:?}", e, msg),
},
Err((e, msg)) => panic!("oh no {}, {:?}", e, msg),
}
eprintln!("Sent {}", i);
}
eprintln!("exiting producer");
});
let mut tasks: FuturesUnordered<_> =
array::IntoIter::new([consumer_task, producer_task]).collect();
while let Some(t) = tasks.next().await {
t.unwrap();
}
}
fn now() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
.try_into()
.unwrap()
let entry = Entry::try_from(message.payload().unwrap().to_vec()).unwrap();
let partition_writes = entry.partition_writes().unwrap();
assert_eq!(partition_writes.len(), 2);
}