feat: add no_sync write_lp param for fast writes (#25902)

pull/25914/head
Michael Gattozzi 2025-01-24 13:34:38 -05:00 committed by GitHub
parent 061b62a09b
commit 43e186d761
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 151 additions and 50 deletions

View File

@ -342,3 +342,54 @@ async fn writes_with_different_schema_should_fail() {
"the request should hae failed with an API Error"
);
}
#[tokio::test]
/// Check that the no_sync param can be used on any endpoint. However, this only means that serde
/// will parse it just fine. It is only able to be used in the v3 endpoint and will
/// default to requiring the WAL to synce before returning.
async fn api_no_sync_param() {
let server = TestServer::spawn().await;
let client = reqwest::Client::new();
let v1_write_url = format!("{base}/write", base = server.client_addr());
let v2_write_url = format!("{base}/api/v2/write", base = server.client_addr());
let v3_write_url = format!("{base}/api/v3/write_lp", base = server.client_addr());
let write_body = "cpu,host=a usage=0.5";
let params = vec![("db", "foo"), ("no_sync", "true")];
let resp = client
.post(&v1_write_url)
.query(&params)
.body(write_body)
.send()
.await
.expect("send /write request");
let status = resp.status();
let body = resp.text().await.expect("response body as text");
println!("Response [{status}]:\n{body}");
assert_eq!(status, StatusCode::NO_CONTENT);
let params = vec![("bucket", "foo"), ("no_sync", "true")];
let resp = client
.post(&v2_write_url)
.query(&params)
.body(write_body)
.send()
.await
.expect("send api/v2/write request");
let status = resp.status();
let body = resp.text().await.expect("response body as text");
println!("Response [{status}]:\n{body}");
assert_eq!(status, StatusCode::NO_CONTENT);
let params = vec![("db", "foo"), ("no_sync", "true")];
let resp = client
.post(&v3_write_url)
.query(&params)
.body(write_body)
.send()
.await
.expect("send api/v3/write request");
let status = resp.status();
let body = resp.text().await.expect("response body as text");
println!("Response [{status}]:\n{body}");
assert_eq!(status, StatusCode::NO_CONTENT);
}

View File

@ -885,6 +885,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;
@ -960,6 +961,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;
@ -1020,6 +1022,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;
@ -1088,6 +1091,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;
@ -1182,6 +1186,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;
@ -1242,6 +1247,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

View File

@ -426,6 +426,7 @@ mod python_plugin {
Time::from_timestamp_nanos(ingest_time.as_nanos() as i64),
false,
Precision::Nanosecond,
false,
)
.await
{
@ -447,6 +448,7 @@ mod python_plugin {
Time::from_timestamp_nanos(ingest_time.as_nanos() as i64),
false,
Precision::Nanosecond,
false,
)
.await
{

View File

@ -518,6 +518,7 @@ where
default_time,
params.accept_partial,
params.precision,
params.no_sync,
)
.await?;
@ -1615,6 +1616,8 @@ pub(crate) struct WriteParams {
pub(crate) accept_partial: bool,
#[serde(default)]
pub(crate) precision: Precision,
#[serde(default)]
pub(crate) no_sync: bool,
}
impl From<iox_http::write::WriteParams> for WriteParams {
@ -1624,6 +1627,7 @@ impl From<iox_http::write::WriteParams> for WriteParams {
// legacy behaviour was to not accept partial:
accept_partial: false,
precision: legacy.precision.into(),
no_sync: false,
}
}
}

View File

@ -899,6 +899,7 @@ mod tests {
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -1010,6 +1011,7 @@ mod tests {
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -1075,6 +1077,7 @@ mod tests {
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -1124,6 +1127,7 @@ mod tests {
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -1189,6 +1193,7 @@ mod tests {
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
false,
)
.await
.unwrap();

View File

@ -70,8 +70,8 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[async_trait]
pub trait Wal: Debug + Send + Sync + 'static {
/// Buffer into a single larger operation in memory. Returns before the operation is persisted.
async fn buffer_op_unconfirmed(&self, op: WalOp) -> Result<(), Error>;
/// Buffer writes ops into the buffer, but returns before the operation is persisted to the WAL.
async fn write_ops_unconfirmed(&self, op: Vec<WalOp>) -> Result<(), Error>;
/// Writes the ops into the buffer and waits until the WAL file is persisted. When this returns
/// the operations are durable in the configured object store and the file notifier has been

View File

@ -229,12 +229,12 @@ impl WalObjectStore {
}
/// Buffer into a single larger operation in memory. Returns before the operation is persisted.
async fn buffer_op_unconfirmed(&self, op: WalOp) -> crate::Result<(), crate::Error> {
async fn write_ops_unconfirmed(&self, op: Vec<WalOp>) -> crate::Result<(), crate::Error> {
self.flush_buffer
.lock()
.await
.wal_buffer
.buffer_op_unconfirmed(op)
.write_ops_unconfirmed(op)
}
/// Writes the op into the buffer and waits until the WAL file is persisted. When this returns
@ -513,8 +513,8 @@ async fn load_all_wal_file_paths(
#[async_trait::async_trait]
impl Wal for WalObjectStore {
async fn buffer_op_unconfirmed(&self, op: WalOp) -> crate::Result<(), crate::Error> {
self.buffer_op_unconfirmed(op).await
async fn write_ops_unconfirmed(&self, op: Vec<WalOp>) -> crate::Result<(), crate::Error> {
self.write_ops_unconfirmed(op).await
}
async fn write_ops(&self, ops: Vec<WalOp>) -> crate::Result<(), crate::Error> {
@ -719,36 +719,38 @@ pub enum WriteResult {
}
impl WalBuffer {
fn buffer_op_unconfirmed(&mut self, op: WalOp) -> crate::Result<(), crate::Error> {
fn write_ops_unconfirmed(&mut self, ops: Vec<WalOp>) -> crate::Result<(), crate::Error> {
if self.op_count >= self.op_limit {
return Err(crate::Error::BufferFull(self.op_count));
}
match op {
WalOp::Write(new_write_batch) => {
let db_name = Arc::clone(&new_write_batch.database_name);
for op in ops {
match op {
WalOp::Write(new_write_batch) => {
let db_name = Arc::clone(&new_write_batch.database_name);
// insert the database write batch or add to existing
let write_batch =
self.database_to_write_batch
.entry(db_name)
.or_insert_with(|| WriteBatch {
database_id: new_write_batch.database_id,
database_name: new_write_batch.database_name,
table_chunks: Default::default(),
min_time_ns: i64::MAX,
max_time_ns: i64::MIN,
});
write_batch.add_write_batch(
new_write_batch.table_chunks,
new_write_batch.min_time_ns,
new_write_batch.max_time_ns,
);
// insert the database write batch or add to existing
let write_batch =
self.database_to_write_batch
.entry(db_name)
.or_insert_with(|| WriteBatch {
database_id: new_write_batch.database_id,
database_name: new_write_batch.database_name,
table_chunks: Default::default(),
min_time_ns: i64::MAX,
max_time_ns: i64::MIN,
});
write_batch.add_write_batch(
new_write_batch.table_chunks,
new_write_batch.min_time_ns,
new_write_batch.max_time_ns,
);
}
WalOp::Catalog(catalog_batch) => {
self.catalog_batches.push(catalog_batch);
}
WalOp::Noop(_) => {}
}
WalOp::Catalog(catalog_batch) => {
self.catalog_batches.push(catalog_batch);
}
WalOp::Noop(_) => {}
}
Ok(())
@ -760,9 +762,7 @@ impl WalBuffer {
response: oneshot::Sender<WriteResult>,
) -> crate::Result<(), crate::Error> {
self.write_op_responses.push(response);
for op in ops {
self.buffer_op_unconfirmed(op)?;
}
self.write_ops_unconfirmed(ops)?;
Ok(())
}
@ -979,7 +979,7 @@ mod tests {
min_time_ns: 1,
max_time_ns: 3,
});
wal.buffer_op_unconfirmed(op1.clone()).await.unwrap();
wal.write_ops_unconfirmed(vec![op1.clone()]).await.unwrap();
let op2 = WalOp::Write(WriteBatch {
database_id: DbId::from(0),
@ -1013,7 +1013,7 @@ mod tests {
min_time_ns: 62_000000000,
max_time_ns: 62_000000000,
});
wal.buffer_op_unconfirmed(op2.clone()).await.unwrap();
wal.write_ops_unconfirmed(vec![op2.clone()]).await.unwrap();
// create wal file 1
let ret = wal.flush_buffer(false).await;
@ -1083,7 +1083,7 @@ mod tests {
);
// create wal file 2
wal.buffer_op_unconfirmed(op2.clone()).await.unwrap();
wal.write_ops_unconfirmed(vec![op2.clone()]).await.unwrap();
assert!(wal.flush_buffer(false).await.is_none());
let file_2_contents = create::wal_contents(
@ -1207,7 +1207,7 @@ mod tests {
min_time_ns: 128_000000000,
max_time_ns: 128_000000000,
});
wal.buffer_op_unconfirmed(op3.clone()).await.unwrap();
wal.write_ops_unconfirmed(vec![op3.clone()]).await.unwrap();
let (snapshot_done, snapshot_info, snapshot_permit) =
wal.flush_buffer(false).await.unwrap();
@ -1394,7 +1394,7 @@ mod tests {
flush_buffer
.wal_buffer
.buffer_op_unconfirmed(WalOp::Write(WriteBatch {
.write_ops_unconfirmed(vec![WalOp::Write(WriteBatch {
database_id: DbId::from(0),
database_name: "db1".into(),
table_chunks: IndexMap::from([(
@ -1425,7 +1425,7 @@ mod tests {
.into(),
min_time_ns: 128_000000000,
max_time_ns: 148_000000000,
}))
})])
.unwrap();
// wal buffer not empty, force snapshot set => snapshot (empty wal buffer
@ -1452,7 +1452,7 @@ mod tests {
// not snapshot
flush_buffer
.wal_buffer
.buffer_op_unconfirmed(WalOp::Write(WriteBatch {
.write_ops_unconfirmed(vec![WalOp::Write(WriteBatch {
database_id: DbId::from(0),
database_name: "db1".into(),
table_chunks: IndexMap::from([(
@ -1483,7 +1483,7 @@ mod tests {
.into(),
min_time_ns: 128_000000000,
max_time_ns: 148_000000000,
}))
})])
.unwrap();
let (wal_contents, _, maybe_snapshot) = flush_buffer

View File

@ -100,6 +100,7 @@ pub trait Bufferer: Debug + Send + Sync + 'static {
ingest_time: Time,
accept_partial: bool,
precision: Precision,
no_sync: bool,
) -> write_buffer::Result<BufferedWriteRequest>;
/// Returns the database schema provider

View File

@ -45,7 +45,7 @@ use metric::Registry;
use metrics::WriteMetrics;
use object_store::path::Path as ObjPath;
use object_store::{ObjectMeta, ObjectStore};
use observability_deps::tracing::{debug, warn};
use observability_deps::tracing::{debug, error, warn};
use parquet_file::storage::ParquetExecInput;
use queryable_buffer::QueryableBufferArgs;
use schema::Schema;
@ -265,6 +265,7 @@ impl WriteBufferImpl {
ingest_time: Time,
accept_partial: bool,
precision: Precision,
no_sync: bool,
) -> Result<BufferedWriteRequest> {
debug!("write_lp to {} in writebuffer", db_name);
@ -286,12 +287,16 @@ impl WriteBufferImpl {
}
ops.push(WalOp::Write(result.valid_data));
// write to the wal. Behind the scenes the ops get buffered in memory and once a second (or
// whatever the configured wal flush interval is set to) the buffer is flushed and all the
// data is persisted into a single wal file in the configured object store. Then the
// contents are sent to the configured notifier, which in this case is the queryable buffer.
// Thus, after this returns, the data is both durable and queryable.
self.wal.write_ops(ops).await?;
if no_sync {
self.wal.write_ops_unconfirmed(ops).await?;
} else {
// write to the wal. Behind the scenes the ops get buffered in memory and once a second (or
// whatever the configured wal flush interval is set to) the buffer is flushed and all the
// data is persisted into a single wal file in the configured object store. Then the
// contents are sent to the configured notifier, which in this case is the queryable buffer.
// Thus, after this returns, the data is both durable and queryable.
self.wal.write_ops(ops).await?;
}
// record metrics for lines written, rejected, and bytes written
self.metrics
@ -438,9 +443,17 @@ impl Bufferer for WriteBufferImpl {
ingest_time: Time,
accept_partial: bool,
precision: Precision,
no_sync: bool,
) -> Result<BufferedWriteRequest> {
self.write_lp(database, lp, ingest_time, accept_partial, precision)
.await
self.write_lp(
database,
lp,
ingest_time,
accept_partial,
precision,
no_sync,
)
.await
}
fn catalog(&self) -> Arc<Catalog> {
@ -994,6 +1007,7 @@ mod tests {
Time::from_timestamp_nanos(123),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -1018,6 +1032,7 @@ mod tests {
Time::from_timestamp_nanos(124),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -1029,6 +1044,7 @@ mod tests {
Time::from_timestamp_nanos(125),
false,
Precision::Nanosecond,
false,
)
.await;
@ -1108,6 +1124,7 @@ mod tests {
Time::from_timestamp(20, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -1166,6 +1183,7 @@ mod tests {
Time::from_timestamp(30, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -1187,6 +1205,7 @@ mod tests {
Time::from_timestamp(40, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -1241,6 +1260,7 @@ mod tests {
Time::from_timestamp(10, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -1264,6 +1284,7 @@ mod tests {
Time::from_timestamp(65, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -1289,6 +1310,7 @@ mod tests {
Time::from_timestamp(147, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -1331,6 +1353,7 @@ mod tests {
Time::from_timestamp(250, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -1410,6 +1433,7 @@ mod tests {
Time::from_timestamp(300, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -1422,6 +1446,7 @@ mod tests {
Time::from_timestamp(330, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -1593,6 +1618,7 @@ mod tests {
Time::from_timestamp(10, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -1602,6 +1628,7 @@ mod tests {
Time::from_timestamp(20, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -1611,6 +1638,7 @@ mod tests {
Time::from_timestamp(30, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -2331,6 +2359,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -2359,6 +2388,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -2911,6 +2941,7 @@ mod tests {
Time::from_timestamp_nanos(w.time_seconds * 1_000_000_000),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
@ -2930,6 +2961,7 @@ mod tests {
Time::from_timestamp_nanos(w.time_seconds * 1_000_000_000),
true,
Precision::Nanosecond,
false,
)
.await
.unwrap();