parent
16f1b40efd
commit
52ae60bf2e
|
|
@ -832,7 +832,7 @@ pub struct Partition {
|
|||
/// of the existing columns relative to each other is NOT changed.
|
||||
///
|
||||
/// For example, updating `A,B,C` to either `A,D,B,C` or `A,B,C,D`
|
||||
/// is legal. Howver, updating to `A,C,D,B` is not because the
|
||||
/// is legal. However, updating to `A,C,D,B` is not because the
|
||||
/// relative order of B and C have been reversed.
|
||||
pub sort_key: Vec<String>,
|
||||
}
|
||||
|
|
@ -1086,7 +1086,7 @@ impl ChunkId {
|
|||
|
||||
/// **TESTING ONLY:** Create new ID from integer.
|
||||
///
|
||||
/// Since this can easily lead to ID collissions (which in turn can lead to panics), this must
|
||||
/// Since this can easily lead to ID collisions (which in turn can lead to panics), this must
|
||||
/// only be used for testing purposes!
|
||||
pub fn new_test(id: u128) -> Self {
|
||||
Self(Uuid::from_u128(id))
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ If you want to trace memory allocations, you need to disable [jemalloc] by passi
|
|||
|
||||
|
||||
## Out-of-memory (OOM)
|
||||
When profling a process that may potentially use too much memory and affect your whole system by doing so, you may want
|
||||
When profiling a process that may potentially use too much memory and affect your whole system by doing so, you may want
|
||||
to limit its resources a bit.
|
||||
|
||||
### ulimit
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ impl<T> Job<T> {
|
|||
///
|
||||
/// You must ensure that this task eventually finishes, otherwise [`DedicatedExecutor::join`] may never return!
|
||||
pub fn detach(mut self) {
|
||||
// cannot destructure `Self` because we implement `Drop`, so we use a flag instead to prevent cancelation.
|
||||
// cannot destructure `Self` because we implement `Drop`, so we use a flag instead to prevent cancellation.
|
||||
self.detached = true;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ enum ShardStatus {
|
|||
SHARD_STATUS_READABLE = 2;
|
||||
|
||||
// The ingester has processed the data in this write and it is both
|
||||
// readable and completly persisted to parquet files.
|
||||
// readable and completely persisted to parquet files.
|
||||
SHARD_STATUS_PERSISTED = 3;
|
||||
|
||||
// The ingester does not have information about this shard
|
||||
|
|
|
|||
|
|
@ -270,7 +270,7 @@ impl VirtualTableBuilder {
|
|||
}))
|
||||
}
|
||||
|
||||
/// register a table provider for this sytem table
|
||||
/// register a table provider for this system table
|
||||
fn build(self, ctx: &mut SessionContext) {
|
||||
let Self {
|
||||
table_name,
|
||||
|
|
|
|||
|
|
@ -199,7 +199,7 @@ fn parse_id(key: impl Iterator<Item = u8>) -> Result<InfluxId, DataError> {
|
|||
///
|
||||
/// Example: sum#!~#sum means 'sum' field key
|
||||
///
|
||||
/// It also turns out that the data after the delimiter does not necessairly
|
||||
/// It also turns out that the data after the delimiter does not necessarily
|
||||
/// escape the data.
|
||||
///
|
||||
/// So for example, the following is a valid field key value (for the
|
||||
|
|
|
|||
|
|
@ -839,13 +839,13 @@ mod tests {
|
|||
let output_batches = datafusion::physical_plan::common::collect(stream)
|
||||
.await
|
||||
.unwrap();
|
||||
// verify no empty record bacthes - bug #3782
|
||||
// verify no empty record batches - bug #3782
|
||||
assert_eq!(output_batches.len(), 2);
|
||||
assert_eq!(output_batches[0].num_rows(), 1);
|
||||
assert_eq!(output_batches[1].num_rows(), 1);
|
||||
|
||||
// verify compacted data
|
||||
// row with "tag1=UT" no longer avaialble
|
||||
// row with "tag1=UT" no longer available
|
||||
let expected = vec![
|
||||
"+-----------+------+-----------------------------+",
|
||||
"| field_int | tag1 | time |",
|
||||
|
|
|
|||
|
|
@ -203,7 +203,7 @@ impl IngesterData {
|
|||
}
|
||||
|
||||
/// The Persister has a function to persist a given partition ID and to update the
|
||||
/// assocated shard's `min_unpersisted_sequence_number`.
|
||||
/// associated shard's `min_unpersisted_sequence_number`.
|
||||
#[async_trait]
|
||||
pub trait Persister: Send + Sync + 'static {
|
||||
/// Persits the partition ID. Will retry forever until it succeeds.
|
||||
|
|
@ -1143,7 +1143,7 @@ impl PartitionData {
|
|||
self.data.add_tombstone(tombstone.clone());
|
||||
|
||||
// ----------------------------------------------------------
|
||||
// First apply the tombstone on all in-memeory & non-persisting data
|
||||
// First apply the tombstone on all in-memory & non-persisting data
|
||||
// Make a QueryableBatch for all buffer + snapshots + the given tombstone
|
||||
let max_sequence_number = tombstone.sequence_number;
|
||||
let query_batch = match self
|
||||
|
|
@ -1265,12 +1265,12 @@ struct DataBuffer {
|
|||
pub(crate) persisting: Option<Arc<PersistingBatch>>,
|
||||
// Extra Notes:
|
||||
// . In MVP, we will only persist a set of snapshots at a time.
|
||||
// In later version, multiple perssiting operations may be happenning concurrently but
|
||||
// In later version, multiple persisting operations may be happening concurrently but
|
||||
// their persisted info must be added into the Catalog in their data
|
||||
// ingesting order.
|
||||
// . When a read request comes from a Querier, all data from `snaphots`
|
||||
// . When a read request comes from a Querier, all data from `snapshots`
|
||||
// and `persisting` must be sent to the Querier.
|
||||
// . After the `persiting` data is persisted and successfully added
|
||||
// . After the `persisting` data is persisted and successfully added
|
||||
// into the Catalog, it will be removed from this Data Buffer.
|
||||
// This data might be added into an extra cache to serve up to
|
||||
// Queriers that may not have loaded the parquet files from object
|
||||
|
|
@ -2551,10 +2551,10 @@ mod tests {
|
|||
assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch)));
|
||||
|
||||
// ------------------------------------------
|
||||
// Take snaphot of the `buffer`
|
||||
// Take snapshot of the `buffer`
|
||||
p.snapshot().unwrap();
|
||||
// verify data
|
||||
assert!(p.data.buffer.is_none()); // empty after snaphot
|
||||
assert!(p.data.buffer.is_none()); // empty after snapshot
|
||||
assert_eq!(p.data.snapshots.len(), 1); // data moved from buffer
|
||||
assert_eq!(p.data.deletes_during_persisting.len(), 1);
|
||||
assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch)));
|
||||
|
|
@ -2593,7 +2593,7 @@ mod tests {
|
|||
assert_eq!(p.data.snapshots.len(), 1); // new snapshot of the existing with delete applied
|
||||
assert_eq!(p.data.deletes_during_persisting.len(), 2); // one more tombstone added make it 2
|
||||
assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch)));
|
||||
// snapshot has only 2 rows becasue the row with tem=60 was removed
|
||||
// snapshot has only 2 rows because the row with tem=60 was removed
|
||||
let data = (*p.data.snapshots[0].data).clone();
|
||||
let expected = vec![
|
||||
"+------------+-----+------+--------------------------------+",
|
||||
|
|
|
|||
|
|
@ -852,9 +852,9 @@ pub(crate) async fn make_one_partition_with_tombstones(
|
|||
let (mut p1, seq_num) =
|
||||
make_first_partition_data(partition_id, loc, shard_id, table_id, table_name);
|
||||
|
||||
// Add tombtones
|
||||
// Depending on where the existing data is, they (buffer & snapshot) will be either moved to a new sanpshot after
|
||||
// appying the tombstone or (persisting) stay where they are and the tomstobes is kept to get applied later
|
||||
// Add tombstones
|
||||
// Depending on where the existing data is, they (buffer & snapshot) will be either moved to a new snapshot after
|
||||
// applying the tombstone or (persisting) stay where they are and the tombstones is kept to get applied later
|
||||
// ------------------------------------------
|
||||
// Delete
|
||||
let mut seq_num = seq_num.get();
|
||||
|
|
|
|||
|
|
@ -185,7 +185,7 @@ pub trait Transaction: Send + Sync + Debug + sealed::TransactionFinalize + RepoC
|
|||
///
|
||||
/// # Error Handling
|
||||
///
|
||||
/// If successfull, all changes will be visible to other transactions.
|
||||
/// If successful, all changes will be visible to other transactions.
|
||||
///
|
||||
/// If an error is returned, the transaction may or or not be committed. This might be due to IO errors after the
|
||||
/// transaction was finished. However in either case, the transaction is atomic and can only succeed or fail
|
||||
|
|
|
|||
|
|
@ -1396,7 +1396,7 @@ impl ProcessedTombstoneRepo for MemTxn {
|
|||
.iter()
|
||||
.any(|pt| pt.tombstone_id == tombstone_id && pt.parquet_file_id == parquet_file_id)
|
||||
{
|
||||
// The tombstone was already proccessed for this file
|
||||
// The tombstone was already processed for this file
|
||||
return Err(Error::ProcessTombstoneExists {
|
||||
parquet_file_id: parquet_file_id.get(),
|
||||
tombstone_id: tombstone_id.get(),
|
||||
|
|
|
|||
|
|
@ -343,9 +343,9 @@ async fn new_raw_pool(
|
|||
let schema_name = schema_name.to_owned();
|
||||
Box::pin(async move {
|
||||
// Tag the connection with the provided application name, while allowing it to
|
||||
// be overriden from the connection string (aka DSN).
|
||||
// be override from the connection string (aka DSN).
|
||||
// If current_application_name is empty here it means the application name wasn't
|
||||
// set as part of the DSN, and we can set it explictly.
|
||||
// set as part of the DSN, and we can set it explicitly.
|
||||
// Recall that this block is running on connection, not when creating the pool!
|
||||
let current_application_name: String =
|
||||
sqlx::query_scalar("SELECT current_setting('application_name');")
|
||||
|
|
|
|||
|
|
@ -389,7 +389,7 @@ mod tests {
|
|||
};
|
||||
|
||||
// use something that has a later timestamp and expect the later one takes
|
||||
// precidence
|
||||
// precedence
|
||||
let l2 = FieldList {
|
||||
fields: vec![field1_later.clone()],
|
||||
};
|
||||
|
|
|
|||
|
|
@ -138,7 +138,7 @@ fn make_schema_pivot_output_schema() -> DFSchemaRef {
|
|||
.unwrap()
|
||||
}
|
||||
|
||||
/// Physical operator that implements the SchemaPivot operation aginst
|
||||
/// Physical operator that implements the SchemaPivot operation against
|
||||
/// data types
|
||||
pub struct SchemaPivotExec {
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
|
|
|
|||
|
|
@ -119,7 +119,7 @@ impl TestCatalog {
|
|||
)
|
||||
}
|
||||
|
||||
/// Create a namesapce in the catalog
|
||||
/// Create a namespace in the catalog
|
||||
pub async fn create_namespace(self: &Arc<Self>, name: &str) -> Arc<TestNamespace> {
|
||||
let mut repos = self.catalog.repositories().await;
|
||||
|
||||
|
|
|
|||
|
|
@ -155,12 +155,12 @@ impl From<InnerDmlError> for HttpDmlError {
|
|||
|
||||
/// Contains a request or a response.
|
||||
///
|
||||
/// This is used to be able to consume a reqest and transform it into a response if routing was successfull.
|
||||
/// This is used to be able to consume a request and transform it into a response if routing was successful.
|
||||
pub enum RequestOrResponse {
|
||||
/// Request still there, wasn't routed.
|
||||
Request(Request<Body>),
|
||||
|
||||
/// Request was consumed and transformed into a response object. Routing was successfull.
|
||||
/// Request was consumed and transformed into a response object. Routing was successful.
|
||||
Response(Response<Body>),
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -36,10 +36,10 @@ pub fn rewrite(expr: Expr) -> Result<Expr> {
|
|||
expr.rewrite(&mut IOxExprRewriter::new())
|
||||
}
|
||||
|
||||
/// Special purpose `Expr` rewrite rules for an Expr that is used as a predcate.
|
||||
/// Special purpose `Expr` rewrite rules for an Expr that is used as a predicate.
|
||||
///
|
||||
/// In general the rewrite rules in Datafusion and IOx attempt to
|
||||
/// preserve the sematics of an expression, especially with respect to
|
||||
/// preserve the semantics of an expression, especially with respect to
|
||||
/// nulls. This means that certain expressions can not be simplified
|
||||
/// (as they may become null)
|
||||
///
|
||||
|
|
|
|||
|
|
@ -172,11 +172,11 @@ pub trait IngesterConnection: std::fmt::Debug + Send + Sync + 'static {
|
|||
span: Option<Span>,
|
||||
) -> Result<Vec<IngesterPartition>>;
|
||||
|
||||
/// Returns the most recent partition sstatus info across all ingester(s) for the specified
|
||||
/// Returns the most recent partition status info across all ingester(s) for the specified
|
||||
/// write token.
|
||||
async fn get_write_info(&self, write_token: &str) -> Result<GetWriteInfoResponse>;
|
||||
|
||||
/// Return backend as [`Any`] which can be used to downcast to a specifc implementation.
|
||||
/// Return backend as [`Any`] which can be used to downcast to a specific implementation.
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -353,7 +353,7 @@ where
|
|||
|
||||
// This is the result of the compactor compacting files persisted by the ingester after persisted_max
|
||||
// The compacted result may include data of before and after persisted_max which prevents
|
||||
// this query to return correct result beacuse it only needs data before persist_max
|
||||
// this query to return correct result because it only needs data before persist_max
|
||||
if file.compaction_level() != CompactionLevel::Initial
|
||||
&& file.max_sequence_number() > persisted_max
|
||||
{
|
||||
|
|
|
|||
|
|
@ -527,7 +527,7 @@ pub enum InfluxColumnType {
|
|||
|
||||
/// Timestamp
|
||||
///
|
||||
/// 64 bit timestamp "UNIX timestamps" representing nanosecods
|
||||
/// 64 bit timestamp "UNIX timestamps" representing nanoseconds
|
||||
/// since the UNIX epoch (00:00:00 UTC on 1 January 1970).
|
||||
Timestamp,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -100,7 +100,7 @@ impl UdpCapture {
|
|||
self.socket_addr.port().to_string()
|
||||
}
|
||||
|
||||
/// stop and wait for succesful shutdown of this server
|
||||
/// stop and wait for successful shutdown of this server
|
||||
pub async fn stop(self) {
|
||||
self.token.cancel();
|
||||
if let Err(e) = self.join_handle.await {
|
||||
|
|
|
|||
Loading…
Reference in New Issue