fix(backend): Prevent HTTP requests access to internal IPV6 addresses for Agent Blocks (#9157)

Addresses:
https://github.com/Significant-Gravitas/AutoGPT/security/advisories/GHSA-4c8v-hwxc-2356

Currently, no IPv6 is used by default on this system. However, the lack
of block HTTP access prevention to internal systems with IPv6 could be a
potential SSRF.

### Changes 🏗️

Prevent internal IPv6 address access on HTTP request blocks.

### Checklist 📋

#### For code changes:
- [ ] I have clearly listed my changes in the PR description
- [ ] I have made a test plan
- [ ] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [ ] ...

<details>
  <summary>Example test plan</summary>
  
  - [ ] Create from scratch and execute an agent with at least 3 blocks
- [ ] Import an agent from file upload, and confirm it executes
correctly
  - [ ] Upload agent to marketplace
- [ ] Import an agent from marketplace and confirm it executes correctly
  - [ ] Edit an agent from monitor, and confirm it executes correctly
</details>

#### For configuration changes:
- [ ] `.env.example` is updated or already compatible with my changes
- [ ] `docker-compose.yml` is updated or already compatible with my
changes
- [ ] I have included a list of my configuration changes in the PR
description (under **Changes**)

<details>
  <summary>Examples of configuration changes</summary>

  - Changing ports
  - Adding new services that need to communicate with each other
  - Secrets or environment variable changes
  - New or infrastructure changes such as databases
</details>
pull/9121/head^2
Zamil Majdy 2024-12-31 23:18:57 +07:00 committed by GitHub
parent 10fc7d2114
commit 26214e1b2c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 30 additions and 14 deletions

View File

@ -12,6 +12,7 @@ from backend.util.settings import Config
# List of IP networks to block
BLOCKED_IP_NETWORKS = [
# --8<-- [start:BLOCKED_IP_NETWORKS]
# IPv4 Ranges
ipaddress.ip_network("0.0.0.0/8"), # "This" Network
ipaddress.ip_network("10.0.0.0/8"), # Private-Use
ipaddress.ip_network("127.0.0.0/8"), # Loopback
@ -20,6 +21,11 @@ BLOCKED_IP_NETWORKS = [
ipaddress.ip_network("192.168.0.0/16"), # Private-Use
ipaddress.ip_network("224.0.0.0/4"), # Multicast
ipaddress.ip_network("240.0.0.0/4"), # Reserved for Future Use
# IPv6 Ranges
ipaddress.ip_network("::1/128"), # Loopback
ipaddress.ip_network("fc00::/7"), # Unique local addresses (ULA)
ipaddress.ip_network("fe80::/10"), # Link-local
ipaddress.ip_network("ff00::/8"), # Multicast
# --8<-- [end:BLOCKED_IP_NETWORKS]
]
@ -28,13 +34,15 @@ HOSTNAME_REGEX = re.compile(r"^[A-Za-z0-9.-]+$") # Basic DNS-safe hostname patt
def _canonicalize_url(url: str) -> str:
# Strip spaces and trailing slashes
"""
Normalizes the URL by:
1. Stripping whitespace and trailing slashes.
2. Ensuring the scheme is http:// or https:// if missing.
3. Replacing backslashes with forward slashes.
"""
url = url.strip().strip("/")
# Ensure the URL starts with http:// or https://
if not url.startswith(("http://", "https://")):
url = "http://" + url
# Replace backslashes with forward slashes to avoid parsing ambiguities
url = url.replace("\\", "/")
return url
@ -49,9 +57,11 @@ def _is_ip_blocked(ip: str) -> bool:
def validate_url(url: str, trusted_origins: list[str]) -> str:
"""
Validates the URL to prevent SSRF attacks by ensuring it does not point to a private
or untrusted IP address, unless whitelisted.
Validates the URL to prevent SSRF attacks by ensuring it does not point
to a private, link-local, or otherwise blocked IP address unless
the hostname is explicitly trusted.
"""
# Normalize/canonicalize input
url = _canonicalize_url(url)
parsed = urlparse(url)
@ -61,7 +71,7 @@ def validate_url(url: str, trusted_origins: list[str]) -> str:
f"Scheme '{parsed.scheme}' is not allowed. Only HTTP/HTTPS are supported."
)
# Validate and IDNA encode the hostname
# Validate and IDNA encode hostname
if not parsed.hostname:
raise ValueError("Invalid URL: No hostname found.")
@ -75,11 +85,11 @@ def validate_url(url: str, trusted_origins: list[str]) -> str:
if not HOSTNAME_REGEX.match(ascii_hostname):
raise ValueError("Hostname contains invalid characters.")
# Rebuild the URL with the normalized, IDNA-encoded hostname
# Rebuild URL with IDNA-encoded hostname
parsed = parsed._replace(netloc=ascii_hostname)
url = str(urlunparse(parsed))
# Check if hostname is a trusted origin (exact match)
# If hostname is trusted, skip IP-based checks
if ascii_hostname in trusted_origins:
return url
@ -92,11 +102,12 @@ def validate_url(url: str, trusted_origins: list[str]) -> str:
if not ip_addresses:
raise ValueError(f"No IP addresses found for {ascii_hostname}")
# Check if any resolved IP address falls into blocked ranges
for ip in ip_addresses:
if _is_ip_blocked(ip):
# Block any IP address that belongs to a blocked range
for ip_str in ip_addresses:
if _is_ip_blocked(ip_str):
raise ValueError(
f"Access to private IP address {ip} for hostname {ascii_hostname} is not allowed."
f"Access to blocked or private IP address {ip_str} "
f"for hostname {ascii_hostname} is not allowed."
)
return url
@ -104,7 +115,9 @@ def validate_url(url: str, trusted_origins: list[str]) -> str:
class Requests:
"""
A wrapper around the requests library that validates URLs before making requests.
A wrapper around the requests library that validates URLs before
making requests, preventing SSRF by blocking private networks and
other disallowed address spaces.
"""
def __init__(
@ -128,13 +141,16 @@ class Requests:
def request(
self, method, url, headers=None, allow_redirects=False, *args, **kwargs
) -> req.Response:
# Merge any extra headers
if self.extra_headers is not None:
headers = {**(headers or {}), **self.extra_headers}
# Validate the URL (with optional extra validator)
url = validate_url(url, self.trusted_origins)
if self.extra_url_validator is not None:
url = self.extra_url_validator(url)
# Perform the request
response = req.request(
method,
url,