refactor: add arc clone lint

pull/24376/head
Edd Robinson 2021-02-15 12:35:21 +00:00
parent e9f1b0f3e2
commit 8eaf006c27
12 changed files with 87 additions and 103 deletions

View File

@ -302,7 +302,7 @@ impl Executor {
let handles = plans
.into_iter()
.map(|plan| {
let counters = self.counters.clone();
let counters = Arc::clone(&self.counters);
tokio::task::spawn(async move {
let ctx = IOxExecutionContext::new(counters);
@ -344,7 +344,7 @@ impl Executor {
/// Create a new execution context, suitable for executing a new query
pub fn new_context(&self) -> IOxExecutionContext {
IOxExecutionContext::new(self.counters.clone())
IOxExecutionContext::new(Arc::clone(&self.counters))
}
/// plans and runs the plans in parallel and collects the results
@ -414,7 +414,7 @@ mod tests {
#[tokio::test]
async fn executor_known_string_set_plan_ok() -> Result<()> {
let expected_strings = to_set(&["Foo", "Bar"]);
let plan = StringSetPlan::Known(expected_strings.clone());
let plan = StringSetPlan::Known(Arc::clone(&expected_strings));
let executor = Executor::default();
let result_strings = executor.to_string_set(plan).await?;
@ -442,8 +442,8 @@ mod tests {
// Test with a single plan that produces one record batch
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
let data = to_string_array(&["foo", "bar", "baz", "foo"]);
let batch =
RecordBatch::try_new(schema.clone(), vec![data]).expect("created new record batch");
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data])
.expect("created new record batch");
let scan = make_plan(schema, vec![batch]);
let plan: StringSetPlan = vec![scan].into();
@ -460,11 +460,11 @@ mod tests {
// Test with a single plan that produces multiple record batches
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
let data1 = to_string_array(&["foo", "bar"]);
let batch1 =
RecordBatch::try_new(schema.clone(), vec![data1]).expect("created new record batch");
let batch1 = RecordBatch::try_new(Arc::clone(&schema), vec![data1])
.expect("created new record batch");
let data2 = to_string_array(&["baz", "foo"]);
let batch2 =
RecordBatch::try_new(schema.clone(), vec![data2]).expect("created new record batch");
let batch2 = RecordBatch::try_new(Arc::clone(&schema), vec![data2])
.expect("created new record batch");
let scan = make_plan(schema, vec![batch1, batch2]);
let plan: StringSetPlan = vec![scan].into();
@ -482,13 +482,13 @@ mod tests {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
let data1 = to_string_array(&["foo", "bar"]);
let batch1 =
RecordBatch::try_new(schema.clone(), vec![data1]).expect("created new record batch");
let scan1 = make_plan(schema.clone(), vec![batch1]);
let batch1 = RecordBatch::try_new(Arc::clone(&schema), vec![data1])
.expect("created new record batch");
let scan1 = make_plan(Arc::clone(&schema), vec![batch1]);
let data2 = to_string_array(&["baz", "foo"]);
let batch2 =
RecordBatch::try_new(schema.clone(), vec![data2]).expect("created new record batch");
let batch2 = RecordBatch::try_new(Arc::clone(&schema), vec![data2])
.expect("created new record batch");
let scan2 = make_plan(schema, vec![batch2]);
let plan: StringSetPlan = vec![scan1, scan2].into();
@ -510,8 +510,8 @@ mod tests {
builder.append_value("foo").unwrap();
builder.append_null().unwrap();
let data = Arc::new(builder.finish());
let batch =
RecordBatch::try_new(schema.clone(), vec![data]).expect("created new record batch");
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data])
.expect("created new record batch");
let scan = make_plan(schema, vec![batch]);
let plan: StringSetPlan = vec![scan].into();
@ -538,8 +538,8 @@ mod tests {
// Ensure that an incorect schema (an int) gives a reasonable error
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
let data = Arc::new(Int64Array::from(vec![1]));
let batch =
RecordBatch::try_new(schema.clone(), vec![data]).expect("created new record batch");
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data])
.expect("created new record batch");
let scan = make_plan(schema, vec![batch]);
let plan: StringSetPlan = vec![scan].into();
@ -571,7 +571,7 @@ mod tests {
Field::new("f2", DataType::Utf8, true),
]));
let batch = RecordBatch::try_new(
schema.clone(),
Arc::clone(&schema),
vec![
to_string_array(&["foo", "bar"]),
to_string_array(&["baz", "bzz"]),

View File

@ -63,7 +63,7 @@ impl ExtensionPlanner for IOxExtensionPlanner {
.map(|schema_pivot| {
assert_eq!(inputs.len(), 1, "Inconsistent number of inputs");
let execution_plan = Arc::new(SchemaPivotExec::new(
inputs[0].clone(),
Arc::clone(&inputs[0]),
schema_pivot.schema().as_ref().clone().into(),
));
Ok(execution_plan as _)

View File

@ -1,7 +1,7 @@
//! This module contains the definition of a "FieldList" a set of
//! records of (field_name, field_type, last_timestamp) and code to
//! pull them from RecordBatches
use std::collections::BTreeMap;
use std::{collections::BTreeMap, sync::Arc};
use arrow_deps::arrow::{
self,
@ -79,7 +79,7 @@ impl IntoFieldList for Vec<RecordBatch> {
arrow_schema
.index_of(TIME_COLUMN_NAME)
.with_context(|| InternalNoTimeColumn {
schema: arrow_schema.clone(),
schema: Arc::clone(&arrow_schema),
})?;
// key: fieldname, value: highest value of time column we have seen
@ -203,8 +203,11 @@ mod tests {
let string_array: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar", "baz", "foo"]));
let timestamp_array: ArrayRef = Arc::new(Int64Array::from(vec![1000, 2000, 3000, 4000]));
let actual = do_conversion(schema.clone(), vec![vec![string_array, timestamp_array]])
.expect("convert correctly");
let actual = do_conversion(
Arc::clone(&schema),
vec![vec![string_array, timestamp_array]],
)
.expect("convert correctly");
let expected = FieldList {
fields: vec![Field {
@ -338,7 +341,7 @@ mod tests {
let batches = value_arrays
.into_iter()
.map(|arrays| {
RecordBatch::try_new(schema.clone(), arrays).expect("created new record batch")
RecordBatch::try_new(Arc::clone(&schema), arrays).expect("created new record batch")
})
.collect::<Vec<_>>();

View File

@ -164,7 +164,7 @@ impl ExecutionPlan for SchemaPivotExec {
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
Arc::clone(&self.schema)
}
fn output_partitioning(&self) -> Partitioning {
@ -176,7 +176,7 @@ impl ExecutionPlan for SchemaPivotExec {
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
vec![Arc::clone(&self.input)]
}
fn with_new_children(
@ -185,8 +185,8 @@ impl ExecutionPlan for SchemaPivotExec {
) -> Result<Arc<dyn ExecutionPlan>> {
match children.len() {
1 => Ok(Arc::new(Self {
input: children[0].clone(),
schema: self.schema.clone(),
input: Arc::clone(&children[0]),
schema: Arc::clone(&self.schema),
})),
_ => Err(DataFusionError::Internal(
"SchemaPivotExec wrong number of children".to_string(),
@ -512,7 +512,7 @@ mod tests {
.map(|test_batch| {
let a_vec = test_batch.a.iter().copied().collect::<Vec<_>>();
RecordBatch::try_new(
schema.clone(),
Arc::clone(&schema),
vec![
Arc::new(Int64Array::from(a_vec)),
to_string_array(test_batch.b),

View File

@ -234,7 +234,7 @@ impl SeriesSetConverter {
.iter()
.map(|end_row| {
let series_set = SeriesSet {
table_name: table_name.clone(),
table_name: Arc::clone(&table_name),
tags: Self::get_tag_keys(
&batch,
start_row as usize,
@ -336,7 +336,7 @@ impl SeriesSetConverter {
.expect("Tag column was a String")
.value(row)
.into();
(column_name.clone(), Arc::new(tag_value))
(Arc::clone(&column_name), Arc::new(tag_value))
})
.collect()
}
@ -917,7 +917,7 @@ mod tests {
}
fn parse_to_iterator(schema: SchemaRef, data: &str) -> SendableRecordBatchStream {
let batch = parse_to_record_batch(schema.clone(), data);
let batch = parse_to_record_batch(Arc::clone(&schema), data);
Box::pin(SizedRecordBatchStream::new(schema, vec![Arc::new(batch)]))
}
}

View File

@ -66,7 +66,7 @@ impl IntoStringSet for Vec<RecordBatch> {
ensure!(
fields.len() == 1,
InternalSchemaWasNotString {
schema: schema.clone(),
schema: Arc::clone(&schema),
}
);
@ -75,7 +75,7 @@ impl IntoStringSet for Vec<RecordBatch> {
ensure!(
field.data_type() == &DataType::Utf8,
InternalSchemaWasNotString {
schema: schema.clone(),
schema: Arc::clone(&schema),
}
);

View File

@ -206,13 +206,13 @@ where
let input_signature = Signature::Exact(vec![value_data_type.clone(), DataType::Int64]);
let state_type = Arc::new(vec![value_data_type.clone(), DataType::Int64]);
let state_type_factory: StateTypeFunction = Arc::new(move |_| Ok(state_type.clone()));
let state_type_factory: StateTypeFunction = Arc::new(move |_| Ok(Arc::clone(&state_type)));
let factory: AccumulatorFunctionImplementation =
Arc::new(move || Ok(Box::new(SelectorAccumulator::<SELECTOR>::new(output))));
let return_type = Arc::new(output.return_type(&value_data_type));
let return_type_func: ReturnTypeFunction = Arc::new(move |_| Ok(return_type.clone()));
let return_type_func: ReturnTypeFunction = Arc::new(move |_| Ok(Arc::clone(&return_type)));
AggregateUDF::new(
name,
@ -622,7 +622,7 @@ mod test {
// define data in two partitions
let batch1 = RecordBatch::try_new(
schema.clone(),
Arc::clone(&schema),
vec![
Arc::new(Float64Array::from(vec![Some(2.0), Some(4.0), None])),
Arc::new(Int64Array::from(vec![Some(20), Some(40), None])),
@ -635,7 +635,7 @@ mod test {
// No values in this batch
let batch2 = RecordBatch::try_new(
schema.clone(),
Arc::clone(&schema),
vec![
Arc::new(Float64Array::from(vec![] as Vec<Option<f64>>)),
Arc::new(Int64Array::from(vec![] as Vec<Option<i64>>)),
@ -647,7 +647,7 @@ mod test {
.unwrap();
let batch3 = RecordBatch::try_new(
schema.clone(),
Arc::clone(&schema),
vec![
Arc::new(Float64Array::from(vec![Some(1.0), Some(5.0), Some(3.0)])),
Arc::new(Int64Array::from(vec![Some(10), Some(50), Some(30)])),
@ -666,8 +666,11 @@ mod test {
)
.unwrap();
let provider =
MemTable::try_new(schema.clone(), vec![vec![batch1], vec![batch2, batch3]]).unwrap();
let provider = MemTable::try_new(
Arc::clone(&schema),
vec![vec![batch1], vec![batch2, batch3]],
)
.unwrap();
let mut ctx = ExecutionContext::new();
ctx.register_table("t", Box::new(provider));

View File

@ -2,7 +2,8 @@
#![warn(
missing_debug_implementations,
clippy::explicit_iter_loop,
clippy::use_self
clippy::use_self,
clippy::clone_on_ref_ptr
)]
use arrow_deps::datafusion::physical_plan::SendableRecordBatchStream;

View File

@ -80,7 +80,7 @@ where
fn clone(&self) -> Self {
Self {
chunk_table_schema: self.chunk_table_schema.clone(),
chunk: self.chunk.clone(),
chunk: Arc::clone(&self.chunk),
}
}
}
@ -207,7 +207,7 @@ impl<C: PartitionChunk + 'static> TableProvider for ChunkTableProvider<C> {
let scan_schema = project_schema(self.arrow_schema(), projection);
let plan = IOxReadFilterNode::new(
self.table_name.clone(),
Arc::clone(&self.table_name),
scan_schema,
self.chunk_and_infos.clone(),
predicate,

View File

@ -1,4 +1,5 @@
//! Holds a stream that ensures chunks have the same (uniform) schema
use std::sync::Arc;
use snafu::Snafu;
use std::task::{Context, Poll};
@ -180,18 +181,18 @@ impl SchemaAdapterStream {
.mappings
.iter()
.map(|mapping| match mapping {
ColumnMapping::FromInput(input_index) => batch.column(*input_index).clone(),
ColumnMapping::FromInput(input_index) => Arc::clone(&batch.column(*input_index)),
ColumnMapping::MakeNull(data_type) => new_null_array(data_type, batch.num_rows()),
})
.collect::<Vec<_>>();
RecordBatch::try_new(self.output_schema.clone(), output_columns)
RecordBatch::try_new(Arc::clone(&self.output_schema), output_columns)
}
}
impl RecordBatchStream for SchemaAdapterStream {
fn schema(&self) -> SchemaRef {
self.output_schema.clone()
Arc::clone(&self.output_schema)
}
}

View File

@ -51,7 +51,7 @@ impl<C: PartitionChunk + 'static> ExecutionPlan for IOxReadFilterNode<C> {
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
Arc::clone(&self.schema)
}
fn output_partitioning(&self) -> Partitioning {
@ -72,8 +72,8 @@ impl<C: PartitionChunk + 'static> ExecutionPlan for IOxReadFilterNode<C> {
// For some reason when I used an automatically derived `Clone` implementation
// the compiler didn't recognize the trait implementation
let new_self = Self {
table_name: self.table_name.clone(),
schema: self.schema.clone(),
table_name: Arc::clone(&self.table_name),
schema: Arc::clone(&self.schema),
chunk_and_infos: self.chunk_and_infos.clone(),
predicate: self.predicate.clone(),
};
@ -114,7 +114,7 @@ impl<C: PartitionChunk + 'static> ExecutionPlan for IOxReadFilterNode<C> {
))
})?;
let adapter = SchemaAdapterStream::try_new(stream, self.schema.clone())
let adapter = SchemaAdapterStream::try_new(stream, Arc::clone(&self.schema))
.map_err(|e| DataFusionError::Internal(e.to_string()))?;
Ok(Box::pin(adapter))

View File

@ -187,13 +187,14 @@ impl TestDatabase {
let column_names = column_names.into_iter().collect::<StringSet>();
let column_names = Arc::new(column_names);
*(self.column_names.clone().lock().expect("mutex poisoned")) = Some(column_names)
*(Arc::clone(&self.column_names)
.lock()
.expect("mutex poisoned")) = Some(column_names)
}
/// Get the parameters from the last column name request
pub fn get_column_names_request(&self) -> Option<ColumnNamesRequest> {
self.column_names_request
.clone()
Arc::clone(&self.column_names_request)
.lock()
.expect("mutex poisoned")
.take()
@ -205,13 +206,14 @@ impl TestDatabase {
let column_values = column_values.into_iter().collect::<StringSet>();
let column_values = Arc::new(column_values);
*(self.column_values.clone().lock().expect("mutex poisoned")) = Some(column_values)
*(Arc::clone(&self.column_values)
.lock()
.expect("mutex poisoned")) = Some(column_values)
}
/// Get the parameters from the last column name request
pub fn get_column_values_request(&self) -> Option<ColumnValuesRequest> {
self.column_values_request
.clone()
Arc::clone(&self.column_values_request)
.lock()
.expect("mutex poisoned")
.take()
@ -219,17 +221,14 @@ impl TestDatabase {
/// Set the series that will be returned on a call to query_series
pub fn set_query_series_values(&self, plan: SeriesSetPlans) {
*(self
.query_series_values
.clone()
*(Arc::clone(&self.query_series_values)
.lock()
.expect("mutex poisoned")) = Some(plan);
}
/// Get the parameters from the last column name request
pub fn get_query_series_request(&self) -> Option<QuerySeriesRequest> {
self.query_series_request
.clone()
Arc::clone(&self.query_series_request)
.lock()
.expect("mutex poisoned")
.take()
@ -237,17 +236,14 @@ impl TestDatabase {
/// Set the series that will be returned on a call to query_groups
pub fn set_query_groups_values(&self, plan: SeriesSetPlans) {
*(self
.query_groups_values
.clone()
*(Arc::clone(&self.query_groups_values)
.lock()
.expect("mutex poisoned")) = Some(plan);
}
/// Get the parameters from the last column name request
pub fn get_query_groups_request(&self) -> Option<QueryGroupsRequest> {
self.query_groups_request
.clone()
Arc::clone(&self.query_groups_request)
.lock()
.expect("mutex poisoned")
.take()
@ -255,17 +251,14 @@ impl TestDatabase {
/// Set the FieldSet plan that will be returned
pub fn set_field_colum_names_values(&self, plan: FieldListPlan) {
*(self
.field_columns_value
.clone()
*(Arc::clone(&self.field_columns_value)
.lock()
.expect("mutex poisoned")) = Some(plan);
}
/// Get the parameters from the last column name request
pub fn get_field_columns_request(&self) -> Option<FieldColumnsRequest> {
self.field_columns_request
.clone()
Arc::clone(&self.field_columns_request)
.lock()
.expect("mutex poisoned")
.take()
@ -336,16 +329,12 @@ impl Database for TestDatabase {
let new_column_names_request = Some(ColumnNamesRequest { predicate });
*self
.column_names_request
.clone()
*Arc::clone(&self.column_names_request)
.lock()
.expect("mutex poisoned") = new_column_names_request;
// pull out the saved columns
let column_names = self
.column_names
.clone()
let column_names = Arc::clone(&self.column_names)
.lock()
.expect("mutex poisoned")
.take()
@ -363,15 +352,12 @@ impl Database for TestDatabase {
let field_columns_request = Some(FieldColumnsRequest { predicate });
*self
.field_columns_request
.clone()
*Arc::clone(&self.field_columns_request)
.lock()
.expect("mutex poisoned") = field_columns_request;
// pull out the saved columns
self.field_columns_value
.clone()
Arc::clone(&self.field_columns_value)
.lock()
.expect("mutex poisoned")
.take()
@ -395,16 +381,12 @@ impl Database for TestDatabase {
predicate,
});
*self
.column_values_request
.clone()
*Arc::clone(&self.column_values_request)
.lock()
.expect("mutex poisoned") = new_column_values_request;
// pull out the saved columns
let column_values = self
.column_values
.clone()
let column_values = Arc::clone(&self.column_values)
.lock()
.expect("mutex poisoned")
.take()
@ -421,14 +403,11 @@ impl Database for TestDatabase {
let new_queries_series_request = Some(QuerySeriesRequest { predicate });
*self
.query_series_request
.clone()
*Arc::clone(&self.query_series_request)
.lock()
.expect("mutex poisoned") = new_queries_series_request;
self.query_series_values
.clone()
Arc::clone(&self.query_series_values)
.lock()
.expect("mutex poisoned")
.take()
@ -447,14 +426,11 @@ impl Database for TestDatabase {
let new_queries_groups_request = Some(QueryGroupsRequest { predicate, gby_agg });
*self
.query_groups_request
.clone()
*Arc::clone(&self.query_groups_request)
.lock()
.expect("mutex poisoned") = new_queries_groups_request;
self.query_groups_values
.clone()
Arc::clone(&self.query_groups_values)
.lock()
.expect("mutex poisoned")
.take()
@ -622,16 +598,16 @@ impl DatabaseStore for TestDatabaseStore {
let mut databases = self.databases.lock().expect("mutex poisoned");
if let Some(db) = databases.get(name) {
Ok(db.clone())
Ok(Arc::clone(&db))
} else {
let new_db = Arc::new(TestDatabase::new());
databases.insert(name.to_string(), new_db.clone());
databases.insert(name.to_string(), Arc::clone(&new_db));
Ok(new_db)
}
}
fn executor(&self) -> Arc<Executor> {
self.executor.clone()
Arc::clone(&self.executor)
}
}