From 26214e1b2c6777e0fae866642b23420adaadd6c4 Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Tue, 31 Dec 2024 23:18:57 +0700 Subject: [PATCH] fix(backend): Prevent HTTP requests access to internal IPV6 addresses for Agent Blocks (#9157) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses: https://github.com/Significant-Gravitas/AutoGPT/security/advisories/GHSA-4c8v-hwxc-2356 Currently, no IPv6 is used by default on this system. However, the lack of block HTTP access prevention to internal systems with IPv6 could be a potential SSRF. ### Changes 🏗️ Prevent internal IPv6 address access on HTTP request blocks. ### Checklist 📋 #### For code changes: - [ ] I have clearly listed my changes in the PR description - [ ] I have made a test plan - [ ] I have tested my changes according to the test plan: - [ ] ...
Example test plan - [ ] Create from scratch and execute an agent with at least 3 blocks - [ ] Import an agent from file upload, and confirm it executes correctly - [ ] Upload agent to marketplace - [ ] Import an agent from marketplace and confirm it executes correctly - [ ] Edit an agent from monitor, and confirm it executes correctly
#### For configuration changes: - [ ] `.env.example` is updated or already compatible with my changes - [ ] `docker-compose.yml` is updated or already compatible with my changes - [ ] I have included a list of my configuration changes in the PR description (under **Changes**)
Examples of configuration changes - Changing ports - Adding new services that need to communicate with each other - Secrets or environment variable changes - New or infrastructure changes such as databases
--- .../backend/backend/util/request.py | 44 +++++++++++++------ 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/autogpt_platform/backend/backend/util/request.py b/autogpt_platform/backend/backend/util/request.py index f1eae6c24..10b716f2c 100644 --- a/autogpt_platform/backend/backend/util/request.py +++ b/autogpt_platform/backend/backend/util/request.py @@ -12,6 +12,7 @@ from backend.util.settings import Config # List of IP networks to block BLOCKED_IP_NETWORKS = [ # --8<-- [start:BLOCKED_IP_NETWORKS] + # IPv4 Ranges ipaddress.ip_network("0.0.0.0/8"), # "This" Network ipaddress.ip_network("10.0.0.0/8"), # Private-Use ipaddress.ip_network("127.0.0.0/8"), # Loopback @@ -20,6 +21,11 @@ BLOCKED_IP_NETWORKS = [ ipaddress.ip_network("192.168.0.0/16"), # Private-Use ipaddress.ip_network("224.0.0.0/4"), # Multicast ipaddress.ip_network("240.0.0.0/4"), # Reserved for Future Use + # IPv6 Ranges + ipaddress.ip_network("::1/128"), # Loopback + ipaddress.ip_network("fc00::/7"), # Unique local addresses (ULA) + ipaddress.ip_network("fe80::/10"), # Link-local + ipaddress.ip_network("ff00::/8"), # Multicast # --8<-- [end:BLOCKED_IP_NETWORKS] ] @@ -28,13 +34,15 @@ HOSTNAME_REGEX = re.compile(r"^[A-Za-z0-9.-]+$") # Basic DNS-safe hostname patt def _canonicalize_url(url: str) -> str: - # Strip spaces and trailing slashes + """ + Normalizes the URL by: + 1. Stripping whitespace and trailing slashes. + 2. Ensuring the scheme is http:// or https:// if missing. + 3. Replacing backslashes with forward slashes. + """ url = url.strip().strip("/") - # Ensure the URL starts with http:// or https:// if not url.startswith(("http://", "https://")): url = "http://" + url - - # Replace backslashes with forward slashes to avoid parsing ambiguities url = url.replace("\\", "/") return url @@ -49,9 +57,11 @@ def _is_ip_blocked(ip: str) -> bool: def validate_url(url: str, trusted_origins: list[str]) -> str: """ - Validates the URL to prevent SSRF attacks by ensuring it does not point to a private - or untrusted IP address, unless whitelisted. + Validates the URL to prevent SSRF attacks by ensuring it does not point + to a private, link-local, or otherwise blocked IP address — unless + the hostname is explicitly trusted. """ + # Normalize/canonicalize input url = _canonicalize_url(url) parsed = urlparse(url) @@ -61,7 +71,7 @@ def validate_url(url: str, trusted_origins: list[str]) -> str: f"Scheme '{parsed.scheme}' is not allowed. Only HTTP/HTTPS are supported." ) - # Validate and IDNA encode the hostname + # Validate and IDNA encode hostname if not parsed.hostname: raise ValueError("Invalid URL: No hostname found.") @@ -75,11 +85,11 @@ def validate_url(url: str, trusted_origins: list[str]) -> str: if not HOSTNAME_REGEX.match(ascii_hostname): raise ValueError("Hostname contains invalid characters.") - # Rebuild the URL with the normalized, IDNA-encoded hostname + # Rebuild URL with IDNA-encoded hostname parsed = parsed._replace(netloc=ascii_hostname) url = str(urlunparse(parsed)) - # Check if hostname is a trusted origin (exact match) + # If hostname is trusted, skip IP-based checks if ascii_hostname in trusted_origins: return url @@ -92,11 +102,12 @@ def validate_url(url: str, trusted_origins: list[str]) -> str: if not ip_addresses: raise ValueError(f"No IP addresses found for {ascii_hostname}") - # Check if any resolved IP address falls into blocked ranges - for ip in ip_addresses: - if _is_ip_blocked(ip): + # Block any IP address that belongs to a blocked range + for ip_str in ip_addresses: + if _is_ip_blocked(ip_str): raise ValueError( - f"Access to private IP address {ip} for hostname {ascii_hostname} is not allowed." + f"Access to blocked or private IP address {ip_str} " + f"for hostname {ascii_hostname} is not allowed." ) return url @@ -104,7 +115,9 @@ def validate_url(url: str, trusted_origins: list[str]) -> str: class Requests: """ - A wrapper around the requests library that validates URLs before making requests. + A wrapper around the requests library that validates URLs before + making requests, preventing SSRF by blocking private networks and + other disallowed address spaces. """ def __init__( @@ -128,13 +141,16 @@ class Requests: def request( self, method, url, headers=None, allow_redirects=False, *args, **kwargs ) -> req.Response: + # Merge any extra headers if self.extra_headers is not None: headers = {**(headers or {}), **self.extra_headers} + # Validate the URL (with optional extra validator) url = validate_url(url, self.trusted_origins) if self.extra_url_validator is not None: url = self.extra_url_validator(url) + # Perform the request response = req.request( method, url,