Handle public IPv4 address resolution for incoming requests via /ping

pull/3368/head
KPrasch 2024-01-19 11:01:17 +01:00
parent 6163ff4d25
commit 28af3f9eda
2 changed files with 68 additions and 5 deletions

View File

@ -27,6 +27,7 @@ from nucypher.policy.conditions.utils import (
evaluate_condition_lingo,
)
from nucypher.utilities.logging import Logger
from nucypher.utilities.networking import get_request_public_ipv4
DECRYPTION_REQUESTS_SUCCESSES = Counter(
"threshold_decryption_num_successes",
@ -297,9 +298,14 @@ def _make_rest_app(this_node, log: Logger) -> Flask:
@rest_app.route("/ping", methods=['GET'])
def ping():
"""Asks this node: What is my IP address?"""
requester_ip_address = request.remote_addr
return Response(requester_ip_address, status=HTTPStatus.OK)
"""Asks this node: What is my public IPv4 address?"""
ipv4 = get_request_public_ipv4(request=request)
if not ipv4:
return Response(
response="No public IPv4 address detected.",
status=HTTPStatus.BAD_GATEWAY,
)
return Response(response=ipv4, status=HTTPStatus.OK)
@rest_app.route('/status/', methods=['GET'])
def status():

View File

@ -1,9 +1,10 @@
import random
from http import HTTPStatus
from ipaddress import ip_address
from typing import Optional, Union
from ipaddress import AddressValueError, IPv4Address, IPv6Address, ip_address
from typing import List, Optional, Union
import requests
from flask import Request
from requests.exceptions import HTTPError, RequestException
from nucypher.acumen.perception import FleetSensor
@ -200,4 +201,60 @@ def determine_external_ip_address(
# complete failure!
if not rest_host:
raise UnknownIPAddress('External IP address detection failed')
return rest_host
def _is_valid_ipv4(ip: str) -> bool:
try:
ip = ip_address(ip.strip())
return isinstance(ip, IPv4Address) and ip.is_global
except AddressValueError:
return False
def _ipv4_to_ipv6(ip: str) -> Optional[str]:
try:
ip = ip_address(ip.strip())
if isinstance(ip, IPv6Address) and ip.ipv4_mapped:
return str(ip.ipv4_mapped)
except AddressValueError:
return None
return None
def _ip_sources(request: Request, trusted_proxies: Optional[List[str]] = None) -> str:
if not trusted_proxies:
trusted_proxies = []
for header in ["X-Forwarded-For", "X-Real-IP"]:
if header in request.headers:
if trusted_proxies and (request.remote_addr not in trusted_proxies):
yield None # Do not trust this request
for ip in request.headers[header].split(","):
yield ip
yield request.remote_addr
def get_request_public_ipv4(
request: Request, trusted_proxies: Optional[List[str]] = None
) -> Optional[str]:
"""
If the request is forwarded from a proxy, the first global IP address in the chain is returned.
'X-Forwarded-For' (XFF) https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For#selecting_an_ip_address
If XFF is not present in the request headers, this method also checks for 'X-Real-IP',
a popular non-standard header conventionally configured in some proxies like nginx.
Finally, if neither XFF nor X-Real-IP are present, the request is assumed to be
direct and the remote address is returned.
In all cases, tests that the IPv4 range is global. RFC 1918 privately designated
address ranges must not be returned.
https://www.ietf.org/rfc/rfc1918.txt
Optionally, a list of trusted proxies can be provided to help mitigate spoofing attacks.
"""
for ip_str in _ip_sources(request=request, trusted_proxies=trusted_proxies):
ipv4_address = _ipv4_to_ipv6(ip_str)
if ipv4_address and _is_valid_ipv4(ipv4_address):
return ipv4_address