respond to RFCs in PR #3398

pull/3368/head
KPrasch 2024-01-23 16:19:18 +01:00
parent 8fe7beba2d
commit 55f8b2102b
3 changed files with 31 additions and 58 deletions

View File

@ -27,7 +27,7 @@ from nucypher.policy.conditions.utils import (
evaluate_condition_lingo,
)
from nucypher.utilities.logging import Logger
from nucypher.utilities.networking import get_request_global_ipv4
from nucypher.utilities.networking import get_global_source_ipv4
DECRYPTION_REQUESTS_SUCCESSES = Counter(
"threshold_decryption_num_successes",
@ -299,7 +299,7 @@ def _make_rest_app(this_node, log: Logger) -> Flask:
@rest_app.route("/ping", methods=['GET'])
def ping():
"""Asks this node: What is my public IPv4 address?"""
ipv4 = get_request_global_ipv4(request=request)
ipv4 = get_global_source_ipv4(request=request)
if not ipv4:
return Response(
response="No public IPv4 address detected.",

View File

@ -208,43 +208,39 @@ def determine_external_ip_address(
return rest_host
def _is_global_ipv4(ip: str) -> bool:
"""Check if an IP address is a global IPv4 address according to RFC 1918."""
try:
ip = ip_address(ip.strip())
return isinstance(ip, IPv4Address) and ip.is_global
except AddressValueError:
return False
def _resolve_ipv4(ip: str) -> Optional[str]:
"""Resolve an IP address to IPv4 if required and possible."""
def _resolve_ipv4(ip: str) -> Optional[IPv4Address]:
"""
Resolve an IPv6 address to IPv4 if required and possible.
Returns None if there is no valid IPv4 address available.
"""
try:
ip = ip_address(ip.strip())
except AddressValueError:
return None
if isinstance(ip, IPv6Address) and ip.ipv4_mapped:
return str(ip.ipv4_mapped)
if isinstance(ip, IPv6Address):
return ip.ipv4_mapped # returns IPv4Address or None
elif isinstance(ip, IPv4Address):
return str(ip)
return ip
def _extract_ip(header: str, request: Request) -> Optional[str]:
"""Extract the first IP address from a request header."""
def _get_header_ips(header: str, request: Request) -> Optional[str]:
"""Yields source IP addresses in sequential order from a given request and header name."""
if header in request.headers:
for ip in request.headers[header].split(","):
yield ip
def _ip_sources(request: Request) -> str:
"""Iterate over all possible sources of IP addresses in a request's headers."""
"""Yields all possible sources of IP addresses in a given request's headers."""
for header in ["X-Forwarded-For", "X-Real-IP"]:
yield from _extract_ip(header, request)
yield from _get_header_ips(header, request)
yield request.remote_addr
def get_request_global_ipv4(request: Request) -> Optional[str]:
def get_global_source_ipv4(request: Request) -> Optional[str]:
"""
Resolve the first global IPv4 address in a request's headers.
If the request is forwarded from a proxy, the first global IP address in the chain is returned.
'X-Forwarded-For' (XFF) https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For#selecting_an_ip_address
@ -257,9 +253,9 @@ def get_request_global_ipv4(request: Request) -> Optional[str]:
In all cases, tests that the IPv4 range is global. RFC 1918 privately designated
address ranges must not be returned.
https://www.ietf.org/rfc/rfc1918.txt
"""
for ip_str in _ip_sources(request=request):
ipv4_address = _resolve_ipv4(ip_str)
if ipv4_address and _is_global_ipv4(ipv4_address):
return ipv4_address
if ipv4_address and ipv4_address.is_global:
return str(ipv4_address)

View File

@ -1,12 +1,13 @@
from ipaddress import IPv4Address
import pytest
from flask import Request
from nucypher.utilities.networking import (
LOOPBACK_ADDRESS,
_ip_sources,
_is_global_ipv4,
_resolve_ipv4,
get_request_global_ipv4,
get_global_source_ipv4,
)
@ -21,38 +22,12 @@ def mock_request_factory(mocker):
return _mock_request_factory
@pytest.mark.parametrize(
"ip, expected",
[
("8.8.8.8", True), # public IPv4
("192.168.1.1", False), # private IPv4
("2001:0db8:85a3:0000:0000:8a2e:0370:7334", False), # IPv6
],
)
def test_is_global_ipv4(ip, expected):
assert _is_global_ipv4(ip) == expected
def test_is_global_ipv4_with_invalid_address():
with pytest.raises(
ValueError, match="'not_an_ip' does not appear to be an IPv4 or IPv6 address"
):
_is_global_ipv4("not_an_ip")
def test_resolve_ipv4_with_valid_ipv4():
assert _resolve_ipv4("8.8.8.8") == "8.8.8.8"
assert _resolve_ipv4("8.8.8.8") == IPv4Address("8.8.8.8")
def test_resolve_ipv4_with_valid_mapped_ipv6():
assert _resolve_ipv4("::ffff:8.8.8.8") == "8.8.8.8"
def test_resolve_ipv4_with_invalid_ip():
with pytest.raises(
ValueError, match="'not_an_ip' does not appear to be an IPv4 or IPv6 address"
):
_is_global_ipv4("not_an_ip")
assert _resolve_ipv4("::ffff:8.8.8.8") == IPv4Address("8.8.8.8")
def test_resolve_ipv4_with_non_mapped_ipv6():
@ -81,19 +56,21 @@ def test_ip_sources_with_no_headers_but_remote_addr(mock_request_factory):
def test_get_request_global_ipv4_with_forwarded_ip(mock_request_factory):
request = mock_request_factory(headers={"X-Forwarded-For": "8.8.8.8, 192.168.1.1"})
assert get_request_global_ipv4(request) == "8.8.8.8"
assert get_global_source_ipv4(request) == "8.8.8.8"
assert isinstance(get_global_source_ipv4(request), str)
def test_get_request_global_ipv4_with_private_ip_only(mock_request_factory):
request = mock_request_factory(headers={"X-Forwarded-For": "192.168.1.1"})
assert get_request_global_ipv4(request) is None
assert get_global_source_ipv4(request) is None
def test_get_request_global_ipv4_with_no_headers_but_valid_remote_addr(
mock_request_factory,
):
request = mock_request_factory(remote_addr="8.8.8.8")
assert get_request_global_ipv4(request) == "8.8.8.8"
assert get_global_source_ipv4(request) == "8.8.8.8"
assert isinstance(get_global_source_ipv4(request), str)
def test_get_request_global_ipv4_with_invalid_remote_addr(mock_request_factory):
@ -101,4 +78,4 @@ def test_get_request_global_ipv4_with_invalid_remote_addr(mock_request_factory):
with pytest.raises(
ValueError, match="'not_an_ip' does not appear to be an IPv4 or IPv6 address"
):
get_request_global_ipv4(request)
get_global_source_ipv4(request)