Make scapy imports in DHCP local (#55647)

pull/55855/head
Erik Montnemery 2021-09-06 19:10:27 +02:00 committed by GitHub
parent b99a22cd4d
commit dd7dea9a3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 12 deletions

View File

@ -14,13 +14,8 @@ from aiodiscover.discovery import (
IP_ADDRESS as DISCOVERY_IP_ADDRESS,
MAC_ADDRESS as DISCOVERY_MAC_ADDRESS,
)
from scapy.arch.common import compile_filter
from scapy.config import conf
from scapy.error import Scapy_Exception
from scapy.layers.dhcp import DHCP
from scapy.layers.inet import IP
from scapy.layers.l2 import Ether
from scapy.sendrecv import AsyncSniffer
from homeassistant.components.device_tracker.const import (
ATTR_HOST_NAME,
@ -282,6 +277,12 @@ class DHCPWatcher(WatcherBase):
async def async_start(self):
"""Start watching for dhcp packets."""
# Local import because importing from scapy has side effects such as opening
# sockets
from scapy.sendrecv import ( # pylint: disable=import-outside-toplevel
AsyncSniffer,
)
# disable scapy promiscuous mode as we do not need it
conf.sniff_promisc = 0
@ -318,6 +319,12 @@ class DHCPWatcher(WatcherBase):
def handle_dhcp_packet(self, packet):
"""Process a dhcp packet."""
# Local import because importing from scapy has side effects such as opening
# sockets
from scapy.layers.dhcp import DHCP # pylint: disable=import-outside-toplevel
from scapy.layers.inet import IP # pylint: disable=import-outside-toplevel
from scapy.layers.l2 import Ether # pylint: disable=import-outside-toplevel
if DHCP not in packet:
return
@ -382,4 +389,10 @@ def _verify_working_pcap(cap_filter):
If we cannot create a filter we will be listening for
all traffic which is too intensive.
"""
# Local import because importing from scapy has side effects such as opening
# sockets
from scapy.arch.common import ( # pylint: disable=import-outside-toplevel
compile_filter,
)
compile_filter(cap_filter)

View File

@ -390,9 +390,9 @@ async def test_setup_and_stop(hass):
)
await hass.async_block_till_done()
with patch("homeassistant.components.dhcp.AsyncSniffer.start") as start_call, patch(
with patch("scapy.sendrecv.AsyncSniffer.start") as start_call, patch(
"homeassistant.components.dhcp._verify_l2socket_setup",
), patch("homeassistant.components.dhcp.compile_filter",), patch(
), patch("scapy.arch.common.compile_filter"), patch(
"homeassistant.components.dhcp.DiscoverHosts.async_discover"
):
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
@ -461,12 +461,10 @@ async def test_setup_fails_with_broken_libpcap(hass, caplog):
)
await hass.async_block_till_done()
with patch("homeassistant.components.dhcp._verify_l2socket_setup",), patch(
"homeassistant.components.dhcp.compile_filter",
with patch("homeassistant.components.dhcp._verify_l2socket_setup"), patch(
"scapy.arch.common.compile_filter",
side_effect=ImportError,
) as compile_filter, patch(
"homeassistant.components.dhcp.AsyncSniffer",
) as async_sniffer, patch(
) as compile_filter, patch("scapy.sendrecv.AsyncSniffer") as async_sniffer, patch(
"homeassistant.components.dhcp.DiscoverHosts.async_discover"
):
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)