2023-10-23 08:33:43 +00:00
|
|
|
import socket
|
|
|
|
import time
|
|
|
|
|
|
|
|
import pytest
|
2023-10-25 19:22:24 +00:00
|
|
|
from requests import RequestException, Session
|
2023-10-23 08:33:43 +00:00
|
|
|
|
|
|
|
from nucypher.utilities.certs import (
|
2023-10-25 19:22:24 +00:00
|
|
|
Address,
|
2023-10-23 08:33:43 +00:00
|
|
|
CertificateCache,
|
2023-10-25 19:22:24 +00:00
|
|
|
P2PSession,
|
|
|
|
SelfSignedCertificateAdapter,
|
2023-10-23 08:33:43 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
# Define test URLs
|
2023-10-23 09:38:20 +00:00
|
|
|
VALID_URL = "https://lynx.nucypher.network:9151/status"
|
2023-10-23 08:33:43 +00:00
|
|
|
INVALID_URL = "https://nonexistent-domain.com"
|
|
|
|
|
|
|
|
MOCK_CERT = """-----BEGIN CERTIFICATE-----
|
|
|
|
MIIDXTCCAkWgAwIBAgIJALm157+YvLEhMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV
|
|
|
|
...
|
|
|
|
-----END CERTIFICATE-----"""
|
|
|
|
|
2024-02-09 13:16:59 +00:00
|
|
|
VALID_BUT_INCORRECT_CERT_FOR_VALID_URL = """-----BEGIN CERTIFICATE-----
|
|
|
|
MIIBgzCCAQigAwIBAgIUYZMjb9wgSIv0G3H9zP6Xezi3y6kwCgYIKoZIzj0EAwQw
|
|
|
|
GDEWMBQGA1UEAwwNMTg4LjE2Ni4yNy40NjAeFw0yNDAxMzAxMjQ0MzhaFw0yNTAx
|
|
|
|
MjkxMjQ0MzhaMBgxFjAUBgNVBAMMDTE4OC4xNjYuMjcuNDYwdjAQBgcqhkjOPQIB
|
|
|
|
BgUrgQQAIgNiAASnd+YYrbrV3WW/hb1+4+RRD/lWLkcgKM5JjZLjuwNU/Ndr1vEl
|
|
|
|
qOAwbz+fcdwgJ7SAkSoK2fQOt90NnnBPDA12MCc0ScwyiQxS7Cm382B4h3No4M4Z
|
|
|
|
E3bLLn1u69g9Y26jEzARMA8GA1UdEQQIMAaHBLymGy4wCgYIKoZIzj0EAwQDaQAw
|
|
|
|
ZgIxAL4cpbec9Hs8O4uXB8zESJJ32err5jejFhWOFexppRTNjhM5copO9c8x24zJ
|
|
|
|
IzqeQgIxALCe9ynrDkT/tOtBNjvPiNvR8aosRsgdsQCcbk3fUCsYXSXTuphpDgMf
|
|
|
|
IKaHuG9nuw==
|
|
|
|
-----END CERTIFICATE-----
|
|
|
|
"""
|
|
|
|
|
2023-10-23 08:33:43 +00:00
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
def cache():
|
|
|
|
return CertificateCache()
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
def adapter(cache):
|
2024-02-09 11:45:07 +00:00
|
|
|
_adapter = SelfSignedCertificateAdapter(certificate_cache=cache)
|
2023-10-23 08:33:43 +00:00
|
|
|
return _adapter
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
def session(adapter):
|
2023-10-23 09:38:20 +00:00
|
|
|
s = P2PSession()
|
2023-10-23 08:33:43 +00:00
|
|
|
s.adapter = adapter # Use the same adapter instance
|
|
|
|
return s
|
|
|
|
|
|
|
|
|
|
|
|
def test_init_adapter(cache, adapter):
|
2023-10-23 09:38:20 +00:00
|
|
|
assert isinstance(adapter, SelfSignedCertificateAdapter)
|
2023-10-23 08:33:43 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_cert_cache_set_get():
|
|
|
|
cache = CertificateCache()
|
2023-10-25 19:22:24 +00:00
|
|
|
address = Address("example.com", 443)
|
2023-10-23 08:33:43 +00:00
|
|
|
cache.set(address, MOCK_CERT)
|
|
|
|
assert cache.get(address) == MOCK_CERT
|
|
|
|
|
|
|
|
|
|
|
|
def test_cert_cache_expiry():
|
|
|
|
cache = CertificateCache(cache_duration=1)
|
2023-10-25 19:22:24 +00:00
|
|
|
address = Address("example.com", 443)
|
2023-10-23 08:33:43 +00:00
|
|
|
cache.set(address, MOCK_CERT)
|
|
|
|
assert not cache.is_expired(address)
|
|
|
|
# Wait for the cert to expire
|
|
|
|
time.sleep(2)
|
|
|
|
assert cache.is_expired(address)
|
|
|
|
|
|
|
|
|
|
|
|
def test_cache_cert(cache):
|
2023-10-25 19:22:24 +00:00
|
|
|
address = Address("example.com", 443)
|
|
|
|
cache.set(address, "cert_data")
|
|
|
|
assert cache.get(address) == "cert_data"
|
2023-10-23 08:33:43 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_send_request(session, mocker):
|
2023-10-25 19:22:24 +00:00
|
|
|
mocked_refresh = mocker.patch.object(
|
|
|
|
session, "_refresh_certificate", return_value=MOCK_CERT
|
|
|
|
)
|
|
|
|
mocker.patch.object(Session, "send", return_value="response")
|
2023-10-23 08:33:43 +00:00
|
|
|
response = session.send(mocker.Mock(url=VALID_URL))
|
|
|
|
mocked_refresh.assert_called()
|
2023-10-25 19:22:24 +00:00
|
|
|
assert response == "response"
|
2023-10-23 08:33:43 +00:00
|
|
|
|
|
|
|
|
2024-02-08 17:34:42 +00:00
|
|
|
@pytest.mark.xfail(reason="This test uses a real network connection")
|
2023-10-23 08:33:43 +00:00
|
|
|
def test_https_request_with_cert_caching():
|
|
|
|
# Create a session with certificate caching
|
2023-10-23 09:38:20 +00:00
|
|
|
session = P2PSession()
|
2023-10-23 08:33:43 +00:00
|
|
|
|
|
|
|
# Send a request (it should succeed)
|
|
|
|
response = session.get(VALID_URL)
|
|
|
|
assert response.status_code == 200
|
|
|
|
|
|
|
|
# Send another request to the same URL (it should use the cached certificate)
|
|
|
|
response = session.get(VALID_URL)
|
|
|
|
assert response.status_code == 200
|
|
|
|
|
|
|
|
|
2024-02-08 17:34:42 +00:00
|
|
|
@pytest.mark.xfail(reason="This test uses a real network connection")
|
2023-10-23 08:33:43 +00:00
|
|
|
def test_https_request_with_cert_refresh():
|
|
|
|
# Create a session with certificate caching
|
2023-10-23 09:38:20 +00:00
|
|
|
session = P2PSession()
|
2023-10-23 08:33:43 +00:00
|
|
|
|
|
|
|
# Send a request (it should succeed)
|
|
|
|
response = session.get(VALID_URL)
|
|
|
|
assert response.status_code == 200
|
|
|
|
|
|
|
|
# Manually expire the cached certificate
|
2023-10-23 09:38:20 +00:00
|
|
|
hostname, port = P2PSession._resolve_address(VALID_URL)
|
2024-02-09 16:01:03 +00:00
|
|
|
session.certificate_cache._expirations[(hostname, port)] = 0
|
2023-10-23 08:33:43 +00:00
|
|
|
|
|
|
|
# Send another request to the same URL (it should refresh the certificate)
|
|
|
|
response = session.get(VALID_URL)
|
|
|
|
assert response.status_code == 200
|
|
|
|
|
|
|
|
|
2024-02-09 13:16:59 +00:00
|
|
|
def test_https_request_with_invalid_cached_cert_and_refresh():
|
|
|
|
# Create a session with certificate caching
|
|
|
|
session = P2PSession()
|
|
|
|
hostname, port = P2PSession._resolve_address(VALID_URL)
|
|
|
|
|
2024-02-09 16:01:03 +00:00
|
|
|
session.certificate_cache.set(
|
|
|
|
Address(hostname, port), VALID_BUT_INCORRECT_CERT_FOR_VALID_URL
|
|
|
|
)
|
2024-02-09 13:16:59 +00:00
|
|
|
|
|
|
|
# Send a request (it should succeed after retrying and refreshing cert)
|
|
|
|
response = session.get(VALID_URL)
|
|
|
|
assert response.status_code == 200
|
|
|
|
|
|
|
|
|
2023-10-23 08:33:43 +00:00
|
|
|
def test_fetch_server_cert_socket_error(session, mocker):
|
2023-10-25 19:22:24 +00:00
|
|
|
mocker.patch("socket.create_connection", side_effect=socket.error)
|
|
|
|
address = Address("localhost", 443)
|
2023-10-23 08:33:43 +00:00
|
|
|
with pytest.raises(socket.error):
|
|
|
|
session._refresh_certificate(address)
|
|
|
|
|
|
|
|
|
|
|
|
def test_send_request_exception(session, mocker):
|
|
|
|
"""Test that a RequestException is raised when the request fails."""
|
|
|
|
mock_request = mocker.Mock()
|
2023-10-25 19:22:24 +00:00
|
|
|
mock_request.url = "https://localhost"
|
2023-10-23 08:33:43 +00:00
|
|
|
|
2023-10-25 19:22:24 +00:00
|
|
|
mocker.patch.object(session, "_refresh_certificate", return_value=MOCK_CERT)
|
|
|
|
mocker.patch("requests.Session.send", side_effect=RequestException)
|
2023-10-23 08:33:43 +00:00
|
|
|
|
|
|
|
with pytest.raises(RequestException):
|
|
|
|
session.send(mock_request)
|
|
|
|
|
|
|
|
|
|
|
|
def test_retry_on_request_exception(session, mocker):
|
|
|
|
"""Test to ensure that the request is retried upon encountering a RequestException."""
|
|
|
|
mock_request = mocker.Mock()
|
2023-10-25 19:22:24 +00:00
|
|
|
mock_request.url = "https://localhost"
|
2023-10-23 08:33:43 +00:00
|
|
|
|
2023-10-25 19:22:24 +00:00
|
|
|
mocker.patch.object(session, "_refresh_certificate", return_value=MOCK_CERT)
|
|
|
|
mocker.patch("requests.Session.send", side_effect=[RequestException, "response"])
|
2023-10-23 08:33:43 +00:00
|
|
|
|
|
|
|
response = session.send(mock_request)
|
2023-10-25 19:22:24 +00:00
|
|
|
assert response == "response"
|