Filter out ASCII tab or newline from input URLs (#90348)

pull/90363/head
Franck Nijhof 2023-03-27 19:49:40 +02:00 committed by GitHub
parent f4fda55405
commit b033232b06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 69 additions and 0 deletions

View File

@ -35,6 +35,9 @@ FILTERS: Final = re.compile(
)
# fmt: on
# Unsafe bytes to be removed per WHATWG spec
UNSAFE_URL_BYTES = ["\t", "\r", "\n"]
@callback
def setup_security_filter(app: Application) -> None:
@ -51,6 +54,21 @@ def setup_security_filter(app: Application) -> None:
request: Request, handler: Callable[[Request], Awaitable[StreamResponse]]
) -> StreamResponse:
"""Process request and block commonly known exploit attempts."""
for unsafe_byte in UNSAFE_URL_BYTES:
if unsafe_byte in request.path:
_LOGGER.warning(
"Filtered a request with an unsafe byte in path: %s",
request.raw_path,
)
raise HTTPBadRequest
if unsafe_byte in request.query_string:
_LOGGER.warning(
"Filtered a request with unsafe byte query string: %s",
request.raw_path,
)
raise HTTPBadRequest
if FILTERS.search(_recursive_unquote(request.path)):
_LOGGER.warning(
"Filtered a potential harmful request to: %s", request.raw_path

View File

@ -107,3 +107,54 @@ async def test_bad_requests(
if fail_on_query_string:
message = "Filtered a request with a potential harmful query string:"
assert message in caplog.text
@pytest.mark.parametrize(
("request_path", "request_params", "fail_on_query_string"),
[
("/some\thing", {}, False),
("/new\nline/cinema", {}, False),
("/return\r/to/sender", {}, False),
("/", {"some": "\thing"}, True),
("/", {"\newline": "cinema"}, True),
("/", {"return": "t\rue"}, True),
],
)
async def test_bad_requests_with_unsafe_bytes(
request_path,
request_params,
fail_on_query_string,
aiohttp_client: ClientSessionGenerator,
caplog: pytest.LogCaptureFixture,
loop,
) -> None:
"""Test request with unsafe bytes in their URLs."""
app = web.Application()
app.router.add_get("/{all:.*}", mock_handler)
setup_security_filter(app)
mock_api_client = await aiohttp_client(app)
# Manual params handling
if request_params:
raw_params = "&".join(f"{val}={key}" for val, key in request_params.items())
man_params = f"?{raw_params}"
else:
man_params = ""
http = urllib3.PoolManager()
resp = await loop.run_in_executor(
None,
http.request,
"GET",
f"http://{mock_api_client.host}:{mock_api_client.port}{request_path}{man_params}",
request_params,
)
assert resp.status == HTTPStatus.BAD_REQUEST
message = "Filtered a request with an unsafe byte in path:"
if fail_on_query_string:
message = "Filtered a request with unsafe byte query string:"
assert message in caplog.text