Update string formatting to use f-string on components (#125987)

* Update string formatting to use f-string on components

* Update code given review feedback

* Use f-string

---------

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
pull/126257/head
Alberto Montes 2024-09-19 11:38:25 +02:00 committed by GitHub
parent b471a6e519
commit b2401bf2e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 81 additions and 99 deletions

View File

@ -888,7 +888,7 @@ class BrSensor(SensorEntity):
if sensor_type.startswith(PRECIPITATION_FORECAST):
result = {ATTR_ATTRIBUTION: data.get(ATTRIBUTION)}
if self._timeframe is not None:
result[TIMEFRAME_LABEL] = "%d min" % (self._timeframe)
result[TIMEFRAME_LABEL] = f"{self._timeframe} min"
self._attr_extra_state_attributes = result

View File

@ -101,7 +101,7 @@ class BrData:
if resp.status == HTTPStatus.OK:
result[SUCCESS] = True
else:
result[MESSAGE] = "Got http statuscode: %d" % (resp.status)
result[MESSAGE] = f"Got http statuscode: {resp.status}"
return result
except (TimeoutError, aiohttp.ClientError) as err:

View File

@ -86,15 +86,13 @@ def setup(hass: HomeAssistant, config: ConfigType) -> bool:
continue
if payload_dict:
payload = "{{{}}}".format(
",".join(f"{key}:{val}" for key, val in payload_dict.items())
)
payload = ",".join(f"{key}:{val}" for key, val in payload_dict.items())
send_data(
conf.get(CONF_URL),
conf.get(CONF_API_KEY),
str(conf.get(CONF_INPUTNODE)),
payload,
f"{{{payload}}}",
)
track_point_in_time(

View File

@ -138,8 +138,7 @@ class GraphiteFeeder(threading.Thread):
with suppress(ValueError):
things["state"] = state.state_as_number(new_state)
lines = [
"%s.%s.%s %f %i"
% (self._prefix, entity_id, key.replace(" ", "_"), value, now)
f"{self._prefix}.{entity_id}.{key.replace(' ', '_')} {value:f} {now}"
for key, value in things.items()
if isinstance(value, (float, int))
]

View File

@ -180,7 +180,7 @@ class DeviceWithPrograms(HomeConnectDevice):
ATTR_DEVICE: self,
ATTR_DESC: k,
ATTR_UNIT: unit,
ATTR_KEY: "BSH.Common.Option.{}".format(k.replace(" ", "")),
ATTR_KEY: f"BSH.Common.Option.{k.replace(' ', '')}",
ATTR_ICON: icon,
ATTR_DEVICE_CLASS: device_class,
ATTR_SIGN: sign,

View File

@ -111,7 +111,7 @@ def setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the KIRA module and load platform."""
# note: module_name is not the HA device name. it's just a unique name
# to ensure the component and platform can share information
module_name = ("%s_%d" % (DOMAIN, idx)) if idx else DOMAIN
module_name = f"{DOMAIN}_{idx}" if idx else DOMAIN
device_name = module_conf.get(CONF_NAME, DOMAIN)
port = module_conf.get(CONF_PORT, DEFAULT_PORT)
host = module_conf.get(CONF_HOST, DEFAULT_HOST)

View File

@ -119,13 +119,13 @@ def rewrite_legacy(config: ConfigType) -> ConfigType:
else:
_LOGGER.warning("Legacy configuration format detected")
for i in range(1, 5):
name_key = "group_%d_name" % i
name_key = f"group_{i}_name"
if name_key in bridge_conf:
groups.append(
{
"number": i,
"type": bridge_conf.get(
"group_%d_type" % i, DEFAULT_LED_TYPE
f"group_{i}_type", DEFAULT_LED_TYPE
),
"name": bridge_conf.get(name_key),
}

View File

@ -173,7 +173,8 @@ class MySensorsLightRGB(MySensorsLight):
new_rgb: tuple[int, int, int] | None = kwargs.get(ATTR_RGB_COLOR)
if new_rgb is None:
return
hex_color = "{:02x}{:02x}{:02x}".format(*new_rgb)
red, green, blue = new_rgb
hex_color = f"{red:02x}{green:02x}{blue:02x}"
self.gateway.set_child_value(
self.node_id, self.child_id, self.value_type, hex_color, ack=1
)
@ -220,7 +221,8 @@ class MySensorsLightRGBW(MySensorsLightRGB):
new_rgbw: tuple[int, int, int, int] | None = kwargs.get(ATTR_RGBW_COLOR)
if new_rgbw is None:
return
hex_color = "{:02x}{:02x}{:02x}{:02x}".format(*new_rgbw)
red, green, blue, white = new_rgbw
hex_color = f"{red:02x}{green:02x}{blue:02x}{white:02x}"
self.gateway.set_child_value(
self.node_id, self.child_id, self.value_type, hex_color, ack=1
)

View File

@ -109,7 +109,7 @@ class NetioApiView(HomeAssistantView):
states, consumptions, cumulated_consumptions, start_dates = [], [], [], []
for i in range(1, 5):
out = "output%d" % i
out = f"output{i}"
states.append(data.get(f"{out}_state") == STATE_ON)
consumptions.append(float(data.get(f"{out}_consumption", 0)))
cumulated_consumptions.append(
@ -168,7 +168,8 @@ class NetioSwitch(SwitchEntity):
def _set(self, value):
val = list("uuuu")
val[int(self.outlet) - 1] = "1" if value else "0"
self.netio.get("port list {}".format("".join(val)))
val = "".join(val)
self.netio.get(f"port list {val}")
self.netio.states[int(self.outlet) - 1] = value
self.schedule_update_ha_state()

View File

@ -185,14 +185,13 @@ class NumatoAPI:
if (device_id, port) not in self.ports_registered:
self.ports_registered[(device_id, port)] = direction
else:
io = (
"input"
if self.ports_registered[(device_id, port)] == gpio.IN
else "output"
)
raise gpio.NumatoGpioError(
"Device {} port {} already in use as {}.".format(
device_id,
port,
"input"
if self.ports_registered[(device_id, port)] == gpio.IN
else "output",
)
f"Device {device_id} port {port} already in use as {io}."
)
def check_device_id(self, device_id: int) -> None:

View File

@ -55,7 +55,7 @@ class DBInterruptibleThreadPoolExecutor(InterruptibleThreadPoolExecutor):
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = "%s_%d" % (self._thread_name_prefix or self, num_threads)
thread_name = f"{self._thread_name_prefix or self}_{num_threads}"
executor_thread = threading.Thread(
name=thread_name,
target=_worker_with_shutdown_hook,

View File

@ -288,9 +288,11 @@ def _migrate_schema(
"The database is about to upgrade from schema version %s to %s%s",
current_version,
end_version,
f". {MIGRATION_NOTE_OFFLINE}"
if current_version < LIVE_MIGRATION_MIN_SCHEMA_VERSION
else "",
(
f". {MIGRATION_NOTE_OFFLINE}"
if current_version < LIVE_MIGRATION_MIN_SCHEMA_VERSION
else ""
),
)
schema_status = dataclass_replace(schema_status, current_version=end_version)
@ -475,11 +477,7 @@ def _add_columns(
try:
connection = session.connection()
connection.execute(
text(
"ALTER TABLE {table} {columns_def}".format(
table=table_name, columns_def=", ".join(columns_def)
)
)
text(f"ALTER TABLE {table_name} {', '.join(columns_def)}")
)
except (InternalError, OperationalError, ProgrammingError):
# Some engines support adding all columns at once,
@ -530,10 +528,8 @@ def _modify_columns(
if engine.dialect.name == SupportedDialect.POSTGRESQL:
columns_def = [
"ALTER {column} TYPE {type}".format(
**dict(zip(["column", "type"], col_def.split(" ", 1), strict=False))
)
for col_def in columns_def
f"ALTER {column} TYPE {type_}"
for column, type_ in (col_def.split(" ", 1) for col_def in columns_def)
]
elif engine.dialect.name == "mssql":
columns_def = [f"ALTER COLUMN {col_def}" for col_def in columns_def]
@ -544,11 +540,7 @@ def _modify_columns(
try:
connection = session.connection()
connection.execute(
text(
"ALTER TABLE {table} {columns_def}".format(
table=table_name, columns_def=", ".join(columns_def)
)
)
text(f"ALTER TABLE {table_name} {', '.join(columns_def)}")
)
except (InternalError, OperationalError):
_LOGGER.info("Unable to use quick column modify. Modifying 1 by 1")

View File

@ -56,7 +56,7 @@ async def _migrate_old_unique_ids(hass, devices):
def sense_to_mdi(sense_icon):
"""Convert sense icon to mdi icon."""
return "mdi:{}".format(MDI_ICONS.get(sense_icon, "power-plug"))
return f"mdi:{MDI_ICONS.get(sense_icon, "power-plug")}"
class SenseDevice(BinarySensorEntity):

View File

@ -78,7 +78,7 @@ TREND_SENSOR_VARIANTS = [
def sense_to_mdi(sense_icon):
"""Convert sense icon to mdi icon."""
return "mdi:{}".format(MDI_ICONS.get(sense_icon, "power-plug"))
return f"mdi:{MDI_ICONS.get(sense_icon, 'power-plug')}"
async def async_setup_entry(

View File

@ -82,7 +82,7 @@ class ImageProcessingSsocr(ImageProcessingEntity):
self.filepath = os.path.join(
self.hass.config.config_dir,
"ssocr-{}.png".format(self._name.replace(" ", "_")),
f"ssocr-{self._name.replace(' ', '_')}.png",
)
crop = [
"crop",

View File

@ -53,10 +53,8 @@ class ListTopItemsIntent(intent.IntentHandler):
if not items:
response.async_set_speech("There are no items on your shopping list")
else:
items_list = ", ".join(itm["name"] for itm in reversed(items))
response.async_set_speech(
"These are the top {} items on your shopping list: {}".format(
min(len(items), 5),
", ".join(itm["name"] for itm in reversed(items)),
)
f"These are the top {min(len(items), 5)} items on your shopping list: {items_list}"
)
return response

View File

@ -166,12 +166,11 @@ class SignalNotificationService(BaseNotificationService):
and int(str(resp.headers.get("Content-Length")))
> attachment_size_limit
):
content_length = int(str(resp.headers.get("Content-Length")))
raise ValueError( # noqa: TRY301
"Attachment too large (Content-Length reports {}). Max size: {}"
" bytes".format(
int(str(resp.headers.get("Content-Length"))),
CONF_MAX_ALLOWED_DOWNLOAD_SIZE_BYTES,
)
"Attachment too large (Content-Length reports "
f"{content_length}). Max size: "
f"{CONF_MAX_ALLOWED_DOWNLOAD_SIZE_BYTES} bytes"
)
size = 0

View File

@ -184,7 +184,7 @@ class Monitor(threading.Thread, SensorEntity):
value[2],
value[1],
)
self.data["temp"] = float("%d.%d" % (value[0], value[2]))
self.data["temp"] = float(f"{value[0]}.{value[2]}")
self.data["humid"] = value[1]
def terminate(self):

View File

@ -140,7 +140,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
slots = {}
for slot in request.get("slots", []):
slots[slot["slotName"]] = {"value": resolve_slot_values(slot)}
slots["{}_raw".format(slot["slotName"])] = {"value": slot["rawValue"]}
slots[f"{slot['slotName']}_raw"] = {"value": slot["rawValue"]}
slots["site_id"] = {"value": request.get("siteId")}
slots["session_id"] = {"value": request.get("sessionId")}
slots["confidenceScore"] = {"value": request["intent"]["confidenceScore"]}

View File

@ -92,9 +92,8 @@ class StarlingBalanceSensor(SensorEntity):
@property
def name(self):
"""Return the name of the sensor."""
return "{} {}".format(
self._account_name, self._balance_data_type.replace("_", " ").capitalize()
)
balance_data_type = self._balance_data_type.replace("_", " ").capitalize()
return f"{self._account_name} {balance_data_type}"
@property
def native_value(self):

View File

@ -80,7 +80,7 @@ def setup(hass: HomeAssistant, config: ConfigType) -> bool:
# Send attribute values
for key, value in states.items():
if isinstance(value, (float, int)):
stat = "{}.{}".format(state.entity_id, key.replace(" ", "_"))
stat = f"{state.entity_id}.{key.replace(' ', '_')}"
statsd_client.gauge(stat, value, sample_rate)
elif isinstance(_state, (float, int)):

View File

@ -367,12 +367,14 @@ class StreamMuxer:
data=self._memory_file.read(),
),
(
segment_duration := float(
(adjusted_dts - self._segment_start_dts) * packet.time_base
(
segment_duration := float(
(adjusted_dts - self._segment_start_dts) * packet.time_base
)
)
)
if last_part
else 0,
if last_part
else 0
),
)
if last_part:
# If we've written the last part, we can close the memory_file.

View File

@ -27,10 +27,9 @@ class SuplaEntity(CoordinatorEntity):
@property
def unique_id(self) -> str:
"""Return a unique ID."""
return "supla-{}-{}".format(
self.channel_data["iodevice"]["gUIDString"].lower(),
self.channel_data["channelNumber"],
)
uid = self.channel_data["iodevice"]["gUIDString"].lower()
channel_number = self.channel_data["channelNumber"]
return f"supla-{uid}-{channel_number}"
@property
def name(self) -> str | None:

View File

@ -103,7 +103,7 @@ class SwissHydrologicalDataSensor(SensorEntity):
@property
def name(self):
"""Return the name of the sensor."""
return "{} {}".format(self._data["water-body-name"], self._condition)
return f"{self._data['water-body-name']} {self._condition}"
@property
def unique_id(self) -> str:

View File

@ -299,9 +299,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
hass_path: str = HOMEASSISTANT_PATH[0]
config_dir = hass.config.config_dir
paths_re = re.compile(
r"(?:{})/(.*)".format("|".join([re.escape(x) for x in (hass_path, config_dir)]))
)
paths_re = re.compile(rf"(?:{re.escape(hass_path)}|{re.escape(config_dir)})/(.*)")
handler = LogErrorHandler(
hass, conf[CONF_MAX_ENTRIES], conf[CONF_FIRE_EVENT], paths_re
)

View File

@ -136,8 +136,8 @@ class Ted5000Gateway:
mtus = int(doc["LiveData"]["System"]["NumberMTU"])
for mtu in range(1, mtus + 1):
power = int(doc["LiveData"]["Power"]["MTU%d" % mtu]["PowerNow"])
voltage = int(doc["LiveData"]["Voltage"]["MTU%d" % mtu]["VoltageNow"])
power = int(doc["LiveData"]["Power"][f"MTU{mtu}"]["PowerNow"])
voltage = int(doc["LiveData"]["Voltage"][f"MTU{mtu}"]["VoltageNow"])
self.data[mtu] = {
UnitOfPower.WATT: power,

View File

@ -194,4 +194,4 @@ class TelldusLiveSensor(TelldusLiveEntity, SensorEntity):
@property
def unique_id(self) -> str:
"""Return a unique ID."""
return "{}-{}-{}".format(*self._id)
return "-".join(self._id)

View File

@ -324,7 +324,7 @@ class TensorFlowImageProcessor(ImageProcessingEntity):
# Draw detected objects
for instance in values:
label = "{} {:.1f}%".format(category, instance["score"])
label = f"{category} {instance['score']:.1f}%"
draw_box(
draw, instance["box"], img_width, img_height, label, (255, 255, 0)
)

View File

@ -61,9 +61,10 @@ class TomatoDeviceScanner(DeviceScanner):
if port is None:
port = 443 if self.ssl else 80
protocol = "https" if self.ssl else "http"
self.req = requests.Request(
"POST",
"http{}://{}:{}/update.cgi".format("s" if self.ssl else "", host, port),
f"{protocol}://{host}:{port}/update.cgi",
data={"_http_id": http_id, "exec": "devlist"},
auth=requests.auth.HTTPBasicAuth(username, password),
).prepare()

View File

@ -84,10 +84,11 @@ class VenstarEntity(CoordinatorEntity[VenstarDataUpdateCoordinator]):
@property
def device_info(self) -> DeviceInfo:
"""Return the device information for this entity."""
fw_ver_major, fw_ver_minor = self._client.get_firmware_ver()
return DeviceInfo(
identifiers={(DOMAIN, self._config.entry_id)},
name=self._client.name,
manufacturer="Venstar",
model=f"{self._client.model}-{self._client.get_type()}",
sw_version="{}.{}".format(*(self._client.get_firmware_ver())),
sw_version=f"{fw_ver_major}.{fw_ver_minor}",
)

View File

@ -110,9 +110,7 @@ class VerisureSmartcam(CoordinatorEntity[VerisureDataUpdateCoordinator], Camera)
return
LOGGER.debug("Download new image %s", new_image_id)
new_image_path = os.path.join(
self._directory_path, "{}{}".format(new_image_id, ".jpg")
)
new_image_path = os.path.join(self._directory_path, f"{new_image_id}.jpg")
new_image_url = new_image["contentUrl"]
self.coordinator.verisure.download_image(new_image_url, new_image_path)
LOGGER.debug("Old image_id=%s", self._image_id)
@ -123,9 +121,7 @@ class VerisureSmartcam(CoordinatorEntity[VerisureDataUpdateCoordinator], Camera)
def delete_image(self, _=None) -> None:
"""Delete an old image."""
remove_image = os.path.join(
self._directory_path, "{}{}".format(self._image_id, ".jpg")
)
remove_image = os.path.join(self._directory_path, f"{self._image_id}.jpg")
try:
os.remove(remove_image)
LOGGER.debug("Deleting old image %s", remove_image)

View File

@ -174,7 +174,7 @@ class ViaggiaTrenoSensor(SensorEntity):
self._state = NO_INFORMATION_STRING
self._unit = ""
else:
self._state = "Error: {}".format(res["error"])
self._state = f"Error: {res['error']}"
self._unit = ""
else:
for i in MONITORED_INFO:

View File

@ -85,9 +85,7 @@ class YeelightConfigFlow(ConfigFlow, domain=DOMAIN):
) -> ConfigFlowResult:
"""Handle discovery from zeroconf."""
self._discovered_ip = discovery_info.host
await self.async_set_unique_id(
"{0:#0{1}x}".format(int(discovery_info.name[-26:-18]), 18)
)
await self.async_set_unique_id(f"{int(discovery_info.name[-26:-18]):#018x}")
return await self._async_handle_discovery_with_unique_id()
async def async_step_ssdp(

View File

@ -617,9 +617,11 @@ class ZHAGatewayProxy(EventBase):
ATTR_NWK: str(event.device_info.nwk),
ATTR_IEEE: str(event.device_info.ieee),
DEVICE_PAIRING_STATUS: event.device_info.pairing_status.name,
ATTR_MODEL: event.device_info.model
if event.device_info.model
else UNKNOWN_MODEL,
ATTR_MODEL: (
event.device_info.model
if event.device_info.model
else UNKNOWN_MODEL
),
ATTR_MANUFACTURER: manuf if manuf else UNKNOWN_MANUFACTURER,
ATTR_SIGNATURE: event.device_info.signature,
},
@ -922,9 +924,7 @@ class LogRelayHandler(logging.Handler):
hass_path: str = HOMEASSISTANT_PATH[0]
config_dir = self.hass.config.config_dir
self.paths_re = re.compile(
r"(?:{})/(.*)".format(
"|".join([re.escape(x) for x in (hass_path, config_dir)])
)
rf"(?:{re.escape(hass_path)}|{re.escape(config_dir)})/(.*)"
)
def emit(self, record: LogRecord) -> None:
@ -1025,9 +1025,9 @@ def cluster_command_schema_to_vol_schema(schema: CommandSchema) -> vol.Schema:
"""Convert a cluster command schema to a voluptuous schema."""
return vol.Schema(
{
vol.Optional(field.name)
if field.optional
else vol.Required(field.name): schema_type_to_vol(field.type)
(
vol.Optional(field.name) if field.optional else vol.Required(field.name)
): schema_type_to_vol(field.type)
for field in schema.fields
}
)

View File

@ -85,8 +85,8 @@ class ZWaveMeRGB(ZWaveMeEntity, LightEntity):
self.device.id, f"exact?level={round(brightness / 2.55)}"
)
return
cmd = "exact?red={}&green={}&blue={}"
cmd = cmd.format(*color) if any(color) else cmd.format(*(255, 255, 255))
red, green, blue = color if any(color) else (255, 255, 255)
cmd = f"exact?red={red}&green={green}&blue={blue}"
self.controller.zwave_api.send_command(self.device.id, cmd)
@property