fix: enable workspace lints on all crates, fix all lints (#25961)

refactor/shared-types-crate
wayne 2025-02-03 17:38:20 -07:00 committed by GitHub
parent ca25ab6f70
commit 27653f5a76
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 106 additions and 82 deletions

View File

@ -5,6 +5,9 @@ authors.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
# Core Crates
authz.workspace = true

View File

@ -39,7 +39,7 @@ impl Authorizer for AllOrNothingAuthorizer {
}
/// The defult [`Authorizer`] implementation that will authorize all requests
#[derive(Debug)]
#[derive(Clone, Copy, Debug)]
pub struct DefaultAuthorizer;
#[async_trait]

View File

@ -48,23 +48,23 @@ impl<W, Q, P, T, L> ServerBuilder<W, Q, P, T, L> {
}
}
#[derive(Debug)]
#[derive(Clone, Copy, Debug)]
pub struct NoWriteBuf;
#[derive(Debug)]
pub struct WithWriteBuf(Arc<dyn WriteBuffer>);
#[derive(Debug)]
#[derive(Clone, Copy, Debug)]
pub struct NoQueryExec;
#[derive(Debug)]
pub struct WithQueryExec(Arc<dyn QueryExecutor>);
#[derive(Debug)]
#[derive(Clone, Copy, Debug)]
pub struct NoPersister;
#[derive(Debug)]
pub struct WithPersister(Arc<Persister>);
#[derive(Debug)]
#[derive(Clone, Copy, Debug)]
pub struct NoTimeProvider;
#[derive(Debug)]
pub struct WithTimeProvider<T>(Arc<T>);
#[derive(Debug)]
#[derive(Clone, Copy, Debug)]
pub struct NoListener;
#[derive(Debug)]
pub struct WithListener(TcpListener);

View File

@ -231,7 +231,7 @@ pub enum Error {
}
#[derive(Debug, Error)]
pub enum AuthorizationError {
pub(crate) enum AuthorizationError {
#[error("the request was not authorized")]
Unauthorized,
#[error("the request was not in the form of 'Authorization: Bearer <token>'")]
@ -459,7 +459,7 @@ impl Error {
}
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub(crate) struct HttpApi<T> {
@ -1466,7 +1466,7 @@ fn validate_db_name(name: &str, accept_rp: bool) -> Result<(), ValidateDbNameErr
Ok(())
}
#[derive(Debug, thiserror::Error)]
#[derive(Clone, Copy, Debug, thiserror::Error)]
pub enum ValidateDbNameError {
#[error(
"invalid character in database name: must be ASCII, \

View File

@ -39,7 +39,7 @@ pub(crate) fn hybrid<MakeRest, Grpc>(
/// on a single port.
///
/// [hyper-server]: https://docs.rs/hyper/0.14.28/hyper/server/struct.Server.html
pub struct HybridMakeService<MakeRest, Grpc> {
pub(crate) struct HybridMakeService<MakeRest, Grpc> {
make_rest: MakeRest,
grpc: Grpc,
}
@ -67,7 +67,7 @@ where
pin_project! {
/// A future that builds a new `Service` for serving REST requests or gRPC requests
pub struct HybridMakeServiceFuture<RestFuture, Grpc> {
pub(crate) struct HybridMakeServiceFuture<RestFuture, Grpc> {
#[pin]
rest_future: RestFuture,
grpc: Option<Grpc>,
@ -94,7 +94,7 @@ where
}
/// The service that can serve both gRPC and REST HTTP Requests
pub struct HybridService<Rest, Grpc> {
pub(crate) struct HybridService<Rest, Grpc> {
rest: Rest,
grpc: Grpc,
}

View File

@ -26,8 +26,8 @@ use crate::system_tables::python_call::ProcessingEngineTriggerTable;
mod python_call;
mod queries;
pub const SYSTEM_SCHEMA_NAME: &str = "system";
pub const TABLE_NAME_PREDICATE: &str = "table_name";
pub(crate) const SYSTEM_SCHEMA_NAME: &str = "system";
pub(crate) const TABLE_NAME_PREDICATE: &str = "table_name";
pub(crate) const QUERIES_TABLE_NAME: &str = "queries";
pub(crate) const LAST_CACHES_TABLE_NAME: &str = "last_caches";

View File

@ -14,7 +14,7 @@ pub(super) struct ProcessingEngineTriggerTable {
}
impl ProcessingEngineTriggerTable {
pub fn new(triggers: Vec<TriggerDefinition>) -> Self {
pub(super) fn new(triggers: Vec<TriggerDefinition>) -> Self {
Self {
schema: trigger_schema(),
triggers,

View File

@ -5,6 +5,8 @@ authors.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
# core crates

View File

@ -91,7 +91,7 @@ impl ToRecordBatch<SampleSysEvent> for SampleSysEvent {
}
impl SampleSysEvent {
pub fn new() -> Self {
pub(crate) fn new() -> Self {
let rand_start_range = 0..100_000_000;
let start = rand::thread_rng().gen_range(rand_start_range);
SampleSysEvent {
@ -197,7 +197,7 @@ impl ToRecordBatch<SampleSysEvent2> for SampleSysEvent2 {
}
impl SampleSysEvent2 {
pub fn new() -> Self {
pub(crate) fn new() -> Self {
let rand_start_range = 0..100_000_000;
let start = rand::thread_rng().gen_range(rand_start_range);
SampleSysEvent2 {
@ -293,7 +293,7 @@ impl ToRecordBatch<SampleSysEvent3> for SampleSysEvent3 {
}
impl SampleSysEvent3 {
pub fn new() -> Self {
pub(crate) fn new() -> Self {
let rand_start_range = 0..100_000_000;
let start = rand::thread_rng().gen_range(rand_start_range);
SampleSysEvent3 {

View File

@ -109,6 +109,7 @@ impl SysEventStore {
// sense to use heap.
pub type RingBuffer<T> = RingBufferVec<T>;
#[derive(Debug)]
pub struct RingBufferVec<T> {
buf: Vec<T>,
max: usize,

View File

@ -5,6 +5,9 @@ authors.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
tokio.workspace = true
serde.workspace = true

View File

@ -16,16 +16,16 @@ pub(crate) struct EventsBucket {
}
impl EventsBucket {
pub fn new() -> Self {
pub(crate) fn new() -> Self {
Self::default()
}
pub fn add_write_sample(&mut self, num_lines: usize, size_bytes: usize) {
pub(crate) fn add_write_sample(&mut self, num_lines: usize, size_bytes: usize) {
self.writes.add_sample(num_lines, size_bytes);
self.num_writes += 1;
}
pub fn update_num_queries(&mut self) {
pub(crate) fn update_num_queries(&mut self) {
self.queries.add_sample();
self.num_queries += 1;
}
@ -48,7 +48,7 @@ pub(crate) struct PerMinuteWrites {
}
impl PerMinuteWrites {
pub fn add_sample(&mut self, num_lines: usize, size_bytes: usize) -> Option<()> {
pub(crate) fn add_sample(&mut self, num_lines: usize, size_bytes: usize) -> Option<()> {
let new_num_lines = num_lines as u64;
self.lines.update(new_num_lines);
self.total_lines += new_num_lines;
@ -67,7 +67,7 @@ pub(crate) struct PerMinuteReads {
}
impl PerMinuteReads {
pub fn add_sample(&mut self) -> Option<()> {
pub(crate) fn add_sample(&mut self) -> Option<()> {
self.num_queries.update(1);
self.total_num_queries += 1;
Some(())

View File

@ -17,7 +17,7 @@ pub(crate) struct Writes {
}
impl Writes {
pub fn add_sample(&mut self, events_bucket: &EventsBucket) -> Option<()> {
pub(crate) fn add_sample(&mut self, events_bucket: &EventsBucket) -> Option<()> {
let num_writes = events_bucket.num_writes as u64;
self.lines.update(&events_bucket.writes.lines);
self.size_bytes.update(&events_bucket.writes.size_bytes);
@ -28,7 +28,7 @@ impl Writes {
Some(())
}
pub fn reset(&mut self) {
pub(crate) fn reset(&mut self) {
self.lines.reset();
self.size_bytes.reset();
self.num_writes.reset();
@ -46,13 +46,13 @@ pub(crate) struct Queries {
}
impl Queries {
pub fn add_sample(&mut self, events_bucket: &EventsBucket) -> Option<()> {
pub(crate) fn add_sample(&mut self, events_bucket: &EventsBucket) -> Option<()> {
self.num_queries.update(events_bucket.num_queries as u64);
self.total_num_queries += events_bucket.queries.total_num_queries;
Some(())
}
pub fn reset(&mut self) {
pub(crate) fn reset(&mut self) {
self.num_queries.reset();
self.total_num_queries = 0;
}
@ -64,11 +64,11 @@ pub(crate) struct Cpu {
}
impl Cpu {
pub fn add_sample(&mut self, cpu_used: f32) -> Option<()> {
pub(crate) fn add_sample(&mut self, cpu_used: f32) -> Option<()> {
self.utilization.update(cpu_used)
}
pub fn reset(&mut self) {
pub(crate) fn reset(&mut self) {
self.utilization.reset();
}
}
@ -79,11 +79,11 @@ pub(crate) struct Memory {
}
impl Memory {
pub fn add_sample(&mut self, mem_used: u64) -> Option<()> {
pub(crate) fn add_sample(&mut self, mem_used: u64) -> Option<()> {
self.usage.update(mem_used)
}
pub fn reset(&mut self) {
pub(crate) fn reset(&mut self) {
self.usage.reset();
}
}

View File

@ -9,7 +9,7 @@ use crate::store::TelemetryStore;
use crate::Result;
#[cfg_attr(test, automock)]
pub trait SystemInfoProvider: Send + Sync + 'static {
pub(crate) trait SystemInfoProvider: Send + Sync + 'static {
fn refresh_metrics(&mut self, pid: Pid);
fn get_pid(&self) -> Result<Pid, &'static str>;
@ -22,7 +22,7 @@ struct SystemInfo {
}
impl SystemInfo {
pub fn new() -> SystemInfo {
pub(crate) fn new() -> SystemInfo {
Self {
system: System::new(),
}
@ -60,13 +60,13 @@ struct CpuAndMemorySampler {
}
impl CpuAndMemorySampler {
pub fn new(system: impl SystemInfoProvider) -> Self {
pub(crate) fn new(system: impl SystemInfoProvider) -> Self {
Self {
system: Box::new(system),
}
}
pub fn get_cpu_and_mem_used(&mut self) -> Option<(f32, u64)> {
pub(crate) fn get_cpu_and_mem_used(&mut self) -> Option<(f32, u64)> {
let pid = self.system.get_pid().ok()?;
self.system.refresh_metrics(pid);
let (cpu_used, memory_used) = self.system.get_process_specific_metrics(pid)?;

View File

@ -13,7 +13,7 @@ pub(crate) struct TelemetrySender {
}
impl TelemetrySender {
pub fn new(client: reqwest::Client, base_url: impl IntoUrl) -> Self {
pub(crate) fn new(client: reqwest::Client, base_url: impl IntoUrl) -> Self {
let base_url: Url = base_url
.into_url()
.expect("Cannot parse telemetry sender url");
@ -25,7 +25,7 @@ impl TelemetrySender {
}
}
pub async fn try_sending(&mut self, telemetry: &TelemetryPayload) -> Result<()> {
pub(crate) async fn try_sending(&mut self, telemetry: &TelemetryPayload) -> Result<()> {
debug!(telemetry = ?telemetry, "trying to send data to telemetry server");
let json = serde_json::to_vec(&telemetry).map_err(TelemetryError::CannotSerializeJson)?;
self.client

View File

@ -27,7 +27,7 @@ impl<T: Default + Num + Copy + NumCast + PartialOrd> RollingStats<T> {
/// is updated locally here to calculate the rolling average for usually
/// an hour for a metric. Refer to [`crate::metrics::Writes`] or
/// [`crate::metrics::Queries`] to see how it is used
pub fn update(&mut self, higher_precision_stats: &Stats<T>) -> Option<()> {
pub(crate) fn update(&mut self, higher_precision_stats: &Stats<T>) -> Option<()> {
if self.num_samples == 0 {
self.min = higher_precision_stats.min;
self.max = higher_precision_stats.max;
@ -50,7 +50,7 @@ impl<T: Default + Num + Copy + NumCast + PartialOrd> RollingStats<T> {
Some(())
}
pub fn reset(&mut self) {
pub(crate) fn reset(&mut self) {
*self = RollingStats::default();
}
}
@ -68,7 +68,7 @@ pub(crate) struct Stats<T> {
impl<T: Default + Num + Copy + NumCast + PartialOrd> Stats<T> {
/// Update the [`Self::min`]/[`Self::max`]/[`Self::avg`] from a
/// new value that is sampled.
pub fn update(&mut self, new_val: T) -> Option<()> {
pub(crate) fn update(&mut self, new_val: T) -> Option<()> {
if self.num_samples == 0 {
self.min = new_val;
self.max = new_val;
@ -84,7 +84,7 @@ impl<T: Default + Num + Copy + NumCast + PartialOrd> Stats<T> {
Some(())
}
pub fn reset(&mut self) {
pub(crate) fn reset(&mut self) {
*self = Stats::default();
}
}

View File

@ -60,10 +60,14 @@ impl TelemetryStore {
});
if !cfg!(test) {
sample_metrics(store.clone(), Duration::from_secs(SAMPLER_INTERVAL_SECS)).await;
sample_metrics(
Arc::clone(&store),
Duration::from_secs(SAMPLER_INTERVAL_SECS),
)
.await;
send_telemetry_in_background(
telemetry_endpoint,
store.clone(),
Arc::clone(&store),
Duration::from_secs(MAIN_SENDER_INTERVAL_SECS),
)
.await;
@ -158,7 +162,7 @@ struct TelemetryStoreInner {
}
impl TelemetryStoreInner {
pub fn new(
fn new(
instance_id: Arc<str>,
os: Arc<str>,
influx_version: Arc<str>,
@ -188,10 +192,10 @@ impl TelemetryStoreInner {
"Snapshot write size in bytes"
);
TelemetryPayload {
os: self.os.clone(),
version: self.influx_version.clone(),
instance_id: self.instance_id.clone(),
storage_type: self.storage_type.clone(),
os: Arc::clone(&self.os),
version: Arc::clone(&self.influx_version),
instance_id: Arc::clone(&self.instance_id),
storage_type: Arc::clone(&self.storage_type),
cores: self.cores,
product_type: "Core",
uptime_secs: self.start_timer.elapsed().as_secs(),

View File

@ -5,6 +5,9 @@ authors.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[features]
"system-py" = ["influxdb3_py_api/system-py", "pyo3"]

View File

@ -113,7 +113,7 @@ pub trait Bufferer: Debug + Send + Sync + 'static {
&self,
db_id: DbId,
table_id: TableId,
filter: &ChunkFilter,
filter: &ChunkFilter<'_>,
) -> Vec<ParquetFile>;
/// A channel to watch for when new persisted snapshots are created
@ -127,7 +127,7 @@ pub trait ChunkContainer: Debug + Send + Sync + 'static {
&self,
db_schema: Arc<DatabaseSchema>,
table_def: Arc<TableDefinition>,
filter: &ChunkFilter,
filter: &ChunkFilter<'_>,
projection: Option<&Vec<usize>>,
ctx: &dyn Session,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError>;

View File

@ -352,7 +352,7 @@ impl Persister {
/// Returns the configured `ObjectStore` that data is loaded from and persisted to.
pub fn object_store(&self) -> Arc<dyn ObjectStore> {
self.object_store.clone()
Arc::clone(&self.object_store)
}
pub fn as_any(&self) -> &dyn Any {
@ -391,6 +391,7 @@ pub async fn serialize_to_parquet(
})
}
#[derive(Debug)]
pub struct ParquetBytes {
pub bytes: Bytes,
pub meta_data: FileMetaData,
@ -615,12 +616,12 @@ mod tests {
LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let persister = Persister::new(Arc::new(local_disk), "test_host", time_provider);
let catalog = Catalog::new(node_id.clone(), instance_id.clone());
let catalog = Catalog::new(Arc::clone(&node_id), Arc::clone(&instance_id));
let _ = catalog.db_or_create("my_db");
persister.persist_catalog(&catalog).await.unwrap();
let catalog = Catalog::new(node_id.clone(), instance_id.clone());
let catalog = Catalog::new(Arc::clone(&node_id), Arc::clone(&instance_id));
let _ = catalog.db_or_create("my_second_db");
persister.persist_catalog(&catalog).await.unwrap();
@ -920,13 +921,13 @@ mod tests {
let persister = Persister::new(Arc::new(local_disk), "test_host", time_provider);
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5);
let stream_builder = RecordBatchReceiverStreamBuilder::new(Arc::clone(&schema), 5);
let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(id_array)]).unwrap();
let batch1 = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(id_array)]).unwrap();
let id_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
let batch2 = RecordBatch::try_new(schema.clone(), vec![Arc::new(id_array)]).unwrap();
let batch2 = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(id_array)]).unwrap();
stream_builder.tx().send(Ok(batch1)).await.unwrap();
stream_builder.tx().send(Ok(batch2)).await.unwrap();
@ -948,13 +949,13 @@ mod tests {
let persister = Persister::new(Arc::new(local_disk), "test_host", time_provider);
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5);
let stream_builder = RecordBatchReceiverStreamBuilder::new(Arc::clone(&schema), 5);
let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(id_array)]).unwrap();
let batch1 = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(id_array)]).unwrap();
let id_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
let batch2 = RecordBatch::try_new(schema.clone(), vec![Arc::new(id_array)]).unwrap();
let batch2 = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(id_array)]).unwrap();
stream_builder.tx().send(Ok(batch1)).await.unwrap();
stream_builder.tx().send(Ok(batch2)).await.unwrap();

View File

@ -327,7 +327,7 @@ impl WriteBufferImpl {
&self,
db_schema: Arc<DatabaseSchema>,
table_def: Arc<TableDefinition>,
filter: &ChunkFilter,
filter: &ChunkFilter<'_>,
projection: Option<&Vec<usize>>,
ctx: &dyn Session,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
@ -409,7 +409,7 @@ impl WriteBufferImpl {
&self,
database_name: &str,
table_name: &str,
filter: &ChunkFilter,
filter: &ChunkFilter<'_>,
projection: Option<&Vec<usize>>,
ctx: &dyn Session,
) -> Vec<Arc<dyn QueryChunk>> {
@ -503,7 +503,7 @@ impl Bufferer for WriteBufferImpl {
&self,
db_id: DbId,
table_id: TableId,
filter: &ChunkFilter,
filter: &ChunkFilter<'_>,
) -> Vec<ParquetFile> {
self.buffer.persisted_parquet_files(db_id, table_id, filter)
}
@ -518,7 +518,7 @@ impl ChunkContainer for WriteBufferImpl {
&self,
db_schema: Arc<DatabaseSchema>,
table_def: Arc<TableDefinition>,
filter: &ChunkFilter,
filter: &ChunkFilter<'_>,
projection: Option<&Vec<usize>>,
ctx: &dyn Session,
) -> crate::Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
@ -546,7 +546,7 @@ impl DistinctCacheManager for WriteBufferImpl {
let catalog_op = CatalogOp::CreateDistinctCache(new_cache_definition.clone());
let catalog_batch = CatalogBatch {
database_id: db_schema.id,
database_name: db_schema.name.clone(),
database_name: Arc::clone(&db_schema.name),
time_ns: self.time_provider.now().timestamp_nanos(),
ops: vec![catalog_op],
};
@ -1652,7 +1652,7 @@ mod tests {
// there should be one snapshot already, i.e., the one we created above:
verify_snapshot_count(1, &wbuf.persister).await;
// there is only one initial catalog so far:
verify_catalog_count(1, object_store.clone()).await;
verify_catalog_count(1, Arc::clone(&object_store)).await;
// do three writes to force a new snapshot
wbuf.write_lp(
@ -1694,7 +1694,7 @@ mod tests {
wbuf.wal.last_snapshot_sequence_number().await
);
// There should be a catalog now, since the above writes updated the catalog
verify_catalog_count(2, object_store.clone()).await;
verify_catalog_count(2, Arc::clone(&object_store)).await;
// Check the catalog sequence number in the latest snapshot is correct:
let persisted_snapshot_bytes = object_store
.get(&SnapshotInfoFilePath::new(
@ -2146,10 +2146,10 @@ mod tests {
// Set DbId to a large number to make sure it is properly set on replay
// and assert that it's what we expect it to be before we replay
dbg!(DbId::next_id());
debug!(db_id = ?DbId::next_id());
DbId::from(10_000).set_next_id();
assert_eq!(DbId::next_id().as_u32(), 10_000);
dbg!(DbId::next_id());
debug!(db_id = ?DbId::next_id());
let (_wbuf, _, _) = setup(
Time::from_timestamp_nanos(0),
Arc::clone(&obj_store),
@ -2161,7 +2161,7 @@ mod tests {
},
)
.await;
dbg!(DbId::next_id());
debug!(db_id = ?DbId::next_id());
assert_eq!(DbId::next_id().as_u32(), 1);
}
@ -3125,7 +3125,7 @@ mod tests {
time_seconds: i64,
}
async fn do_writes<W: WriteBuffer, LP: AsRef<str>>(
async fn do_writes<W: WriteBuffer, LP: AsRef<str> + Send + Sync>(
db: &'static str,
buffer: &W,
writes: &[TestWrite<LP>],
@ -3145,7 +3145,7 @@ mod tests {
}
}
async fn do_writes_partial<W: WriteBuffer, LP: AsRef<str>>(
async fn do_writes_partial<W: WriteBuffer, LP: AsRef<str> + Send + Sync>(
db: &'static str,
buffer: &W,
writes: &[TestWrite<LP>],

View File

@ -54,7 +54,7 @@ impl PersistedFiles {
&self,
db_id: DbId,
table_id: TableId,
filter: &ChunkFilter,
filter: &ChunkFilter<'_>,
) -> Vec<ParquetFile> {
let inner = self.inner.read();
let mut files = inner
@ -98,7 +98,9 @@ struct Inner {
}
impl Inner {
pub fn new_from_persisted_snapshots(persisted_snapshots: Vec<PersistedSnapshot>) -> Self {
pub(crate) fn new_from_persisted_snapshots(
persisted_snapshots: Vec<PersistedSnapshot>,
) -> Self {
let mut file_count = 0;
let mut size_in_mb = 0.0;
let mut row_count = 0;
@ -123,7 +125,7 @@ impl Inner {
}
}
pub fn add_persisted_snapshot(&mut self, persisted_snapshot: PersistedSnapshot) {
pub(crate) fn add_persisted_snapshot(&mut self, persisted_snapshot: PersistedSnapshot) {
self.parquet_files_row_count += persisted_snapshot.row_count;
self.parquet_files_size_mb += as_mb(persisted_snapshot.parquet_size_bytes);
let file_count =
@ -131,7 +133,7 @@ impl Inner {
self.parquet_files_count += file_count;
}
pub fn add_persisted_file(
pub(crate) fn add_persisted_file(
&mut self,
db_id: &DbId,
table_id: &TableId,

View File

@ -52,6 +52,7 @@ pub struct QueryableBuffer {
persisted_snapshot_notify_tx: tokio::sync::watch::Sender<Option<PersistedSnapshot>>,
}
#[derive(Debug)]
pub struct QueryableBufferArgs {
pub executor: Arc<Executor>,
pub catalog: Arc<Catalog>,
@ -95,7 +96,7 @@ impl QueryableBuffer {
&self,
db_schema: Arc<DatabaseSchema>,
table_def: Arc<TableDefinition>,
buffer_filter: &ChunkFilter,
buffer_filter: &ChunkFilter<'_>,
_projection: Option<&Vec<usize>>,
_ctx: &dyn Session,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
@ -411,7 +412,7 @@ impl QueryableBuffer {
&self,
db_id: DbId,
table_id: TableId,
filter: &ChunkFilter,
filter: &ChunkFilter<'_>,
) -> Vec<ParquetFile> {
self.persisted_files
.get_files_filtered(db_id, table_id, filter)

View File

@ -31,7 +31,7 @@ pub enum Error {
RecordBatchError(#[from] arrow::error::ArrowError),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;
pub struct TableBuffer {
chunk_time_to_chunks: BTreeMap<i64, MutableTableChunk>,
@ -73,7 +73,7 @@ impl TableBuffer {
pub fn partitioned_record_batches(
&self,
table_def: Arc<TableDefinition>,
filter: &ChunkFilter,
filter: &ChunkFilter<'_>,
) -> Result<HashMap<i64, (TimestampMinMax, Vec<RecordBatch>)>> {
let mut batches = HashMap::new();
let schema = table_def.schema.as_arrow();
@ -92,7 +92,7 @@ impl TableBuffer {
})
.collect();
let cols = cols?;
let rb = RecordBatch::try_new(schema.clone(), cols)?;
let rb = RecordBatch::try_new(Arc::clone(&schema), cols)?;
let (ts, v) = batches
.entry(sc.chunk_time)
.or_insert_with(|| (sc.timestamp_min_max, Vec::new()));
@ -506,7 +506,7 @@ impl std::fmt::Debug for MutableTableChunk {
}
}
pub enum Builder {
pub(super) enum Builder {
Bool(BooleanBuilder),
I64(Int64Builder),
F64(Float64Builder),

View File

@ -20,6 +20,7 @@ use super::Error;
/// Type state for the [`WriteValidator`] after it has been initialized
/// with the catalog.
#[derive(Debug)]
pub struct WithCatalog {
catalog: Arc<Catalog>,
db_schema: Arc<DatabaseSchema>,
@ -28,6 +29,7 @@ pub struct WithCatalog {
/// Type state for the [`WriteValidator`] after it has parsed v1 or v3
/// line protocol.
#[derive(Debug)]
pub struct LinesParsed {
catalog: WithCatalog,
lines: Vec<QualifiedLine>,
@ -48,6 +50,7 @@ impl LinesParsed {
/// A state machine for validating v1 or v3 line protocol and updating
/// the [`Catalog`] with new tables or schema changes.
#[derive(Debug)]
pub struct WriteValidator<State> {
state: State,
}
@ -166,7 +169,7 @@ type ColumnTracker = Vec<(ColumnId, Arc<str>, InfluxColumnType)>;
fn validate_and_qualify_v1_line(
db_schema: &mut Cow<'_, DatabaseSchema>,
line_number: usize,
line: ParsedLine,
line: ParsedLine<'_>,
ingest_time: Time,
precision: Precision,
) -> Result<(QualifiedLine, Option<CatalogOp>), WriteLineError> {
@ -468,6 +471,7 @@ fn convert_qualified_line(
table_chunks.push_row(chunk_time, line.row);
}
#[derive(Debug)]
struct QualifiedLine {
table_id: TableId,
row: Row,