Add Basic Authentication tests.

pull/2664/head
derekpierre 2021-06-21 16:32:40 -04:00
parent dc2b75b0ba
commit e8f0e20400
8 changed files with 173 additions and 4 deletions

View File

@ -177,7 +177,7 @@ def run(general_config,
emitter.message(f"Provider: {provider_uri}", color='green')
if basic_auth_filepath:
emitter.message(f"Basic Authentication enabled", color='green')
emitter.message("Basic Authentication enabled", color='green')
controller = PORTER.make_web_controller(htpasswd_filepath=basic_auth_filepath, crash_on_error=False)
http_scheme = "https" if is_https else "http"

View File

@ -140,7 +140,7 @@ PORTER_REQUIRES = ['flask-htpasswd'] # needed for basic authentication
EXTRAS = {
# Admin
'dev': DEV_REQUIRES + URSULA_REQUIRES + ALICE_REQUIRES,
'dev': DEV_REQUIRES + URSULA_REQUIRES + ALICE_REQUIRES + PORTER_REQUIRES,
'benchmark': DEV_REQUIRES + BENCHMARK_REQUIRES,
'deploy': DEPLOY_REQUIRES,

View File

@ -21,7 +21,11 @@ from pathlib import Path
import pytest
from nucypher.characters.lawful import Ursula
from nucypher.cli.literature import PORTER_RUN_MESSAGE, BOTH_TLS_KEY_AND_CERTIFICATION_MUST_BE_PROVIDED
from nucypher.cli.literature import (
PORTER_RUN_MESSAGE,
BOTH_TLS_KEY_AND_CERTIFICATION_MUST_BE_PROVIDED,
BASIC_AUTH_REQUIRES_HTTPS
)
from nucypher.cli.main import nucypher_cli
from nucypher.config.constants import TEMPORARY_DOMAIN
from nucypher.utilities.porter.porter import Porter
@ -152,6 +156,28 @@ def test_federated_cli_run_https(click_runner, federated_ursulas, temp_dir_path,
assert PORTER_RUN_MESSAGE.format(http_scheme="https", http_port=Porter.DEFAULT_PORT) in result.output
def test_federated_cli_run_https_basic_auth(click_runner,
federated_ursulas,
federated_teacher_uri,
temp_dir_path,
basic_auth_file):
tls_key_path = Path(temp_dir_path) / 'key.pem'
_write_random_data(tls_key_path)
certificate_file_path = Path(temp_dir_path) / 'fullchain.pem'
_write_random_data(certificate_file_path)
porter_run_command = ('porter', 'run',
'--dry-run',
'--federated-only',
'--teacher', federated_teacher_uri,
'--tls-key-filepath', tls_key_path,
'--tls-certificate-filepath', certificate_file_path,
'--basic-auth-filepath', basic_auth_file)
result = click_runner.invoke(nucypher_cli, porter_run_command, catch_exceptions=False)
assert result.exit_code == 0
assert "Basic Authentication enabled" in result.output
def test_blockchain_porter_cli_run_simple(click_runner,
blockchain_ursulas,
testerchain,
@ -247,6 +273,48 @@ def test_blockchain_porter_cli_run_https(click_runner,
assert PORTER_RUN_MESSAGE.format(http_scheme="https", http_port=Porter.DEFAULT_PORT) in result.output
def test_blockchain_porter_cli_run_https_basic_auth(click_runner,
blockchain_ursulas,
blockchain_teacher_uri,
testerchain,
agency_local_registry,
temp_dir_path,
basic_auth_file
):
tls_key_path = Path(temp_dir_path) / 'key.pem'
_write_random_data(tls_key_path)
certificate_file_path = Path(temp_dir_path) / 'fullchain.pem'
_write_random_data(certificate_file_path)
# Basic Auth requires https - missing both tls parameters
porter_run_command = ('porter', 'run',
'--dry-run',
'--network', TEMPORARY_DOMAIN,
'--provider', TEST_PROVIDER_URI,
'--registry-filepath', agency_local_registry.filepath,
'--teacher', blockchain_teacher_uri,
'--basic-auth-filepath', basic_auth_file)
result = click_runner.invoke(nucypher_cli, porter_run_command, catch_exceptions=False)
assert result.exit_code != 0
assert BASIC_AUTH_REQUIRES_HTTPS in result.output
# Basic Auth
porter_run_command = ('porter', 'run',
'--dry-run',
'--network', TEMPORARY_DOMAIN,
'--provider', TEST_PROVIDER_URI,
'--registry-filepath', agency_local_registry.filepath,
'--teacher', blockchain_teacher_uri,
'--tls-key-filepath', tls_key_path,
'--tls-certificate-filepath', certificate_file_path,
'--basic-auth-filepath', basic_auth_file)
result = click_runner.invoke(nucypher_cli, porter_run_command, catch_exceptions=False)
assert result.exit_code == 0
assert "Basic Authentication enabled" in result.output
def _write_random_data(filepath: Path):
with filepath.open('wb') as file:
file.write(os.urandom(24))

View File

@ -27,6 +27,12 @@ def blockchain_porter_web_controller(blockchain_porter):
yield web_controller.test_client()
@pytest.fixture(scope='module')
def blockchain_porter_basic_auth_web_controller(blockchain_porter, basic_auth_file):
web_controller = blockchain_porter.make_web_controller(crash_on_error=True, htpasswd_filepath=basic_auth_file)
yield web_controller.test_client()
#
# RPC
#

View File

@ -106,6 +106,8 @@ def test_publish_and_get_treasure_map(blockchain_porter_web_controller,
}
response = blockchain_porter_web_controller.post('/publish_treasure_map', data=json.dumps(publish_treasure_map_params))
assert response.status_code == 200
response_data = json.loads(response.data)
assert response_data['result']['published']
# try getting the recently published treasure map
map_id = blockchain_bob.construct_map_id(blockchain_alice.stamp,
@ -115,7 +117,29 @@ def test_publish_and_get_treasure_map(blockchain_porter_web_controller,
'bob_encrypting_key': blockchain_bob_encrypting_key.hex()
}
response = blockchain_porter_web_controller.get('/get_treasure_map',
data=json.dumps(get_treasure_map_params))
data=json.dumps(get_treasure_map_params))
assert response.status_code == 200
response_data = json.loads(response.data)
assert response_data['result']['treasure_map'] == b64encode(bytes(treasure_map)).decode()
def test_get_ursulas_basic_auth(blockchain_porter_basic_auth_web_controller):
quantity = 4
duration = 2
get_ursulas_params = {
'quantity': quantity,
'duration_periods': duration,
}
response = blockchain_porter_basic_auth_web_controller.get('/get_ursulas', data=json.dumps(get_ursulas_params))
assert response.status_code == 401 # user is unauthorized
credentials = b64encode(b"admin:admin").decode('utf-8')
response = blockchain_porter_basic_auth_web_controller.get('/get_ursulas',
data=json.dumps(get_ursulas_params),
headers={"Authorization": f"Basic {credentials}"})
assert response.status_code == 200 # success - access allowed
response_data = json.loads(response.data)
ursulas_info = response_data['result']['ursulas']
returned_ursula_addresses = {ursula_info['checksum_address'] for ursula_info in ursulas_info} # ensure no repeats
assert len(returned_ursula_addresses) == quantity

View File

@ -23,6 +23,7 @@ import shutil
import tempfile
from datetime import datetime, timedelta
from functools import partial
from pathlib import Path
from typing import Callable, Tuple
import maya
@ -1087,3 +1088,16 @@ def mock_teacher_nodes(mocker):
def disable_interactive_keystore_generation(mocker):
# Do not notify or confirm mnemonic seed words during tests normally
mocker.patch.object(Keystore, '_confirm_generate')
#
# Web Auth
#
@pytest.fixture(scope='module')
def basic_auth_file(temp_dir_path):
basic_auth = Path(temp_dir_path) / 'htpasswd'
with basic_auth.open("w") as f:
# username: "admin", password: "admin"
f.write("admin:$apr1$hlEpWVoI$0qjykXrvdZ0yO2TnBggQO0\n")
yield basic_auth
basic_auth.unlink()

View File

@ -26,6 +26,12 @@ def federated_porter_web_controller(federated_porter):
yield web_controller.test_client()
@pytest.fixture(scope='module')
def federated_porter_basic_auth_web_controller(federated_porter, basic_auth_file):
web_controller = federated_porter.make_web_controller(crash_on_error=True, htpasswd_filepath=basic_auth_file)
yield web_controller.test_client()
#
# RPC
#

View File

@ -108,6 +108,8 @@ def test_publish_and_get_treasure_map(federated_porter_web_controller,
response = federated_porter_web_controller.post('/publish_treasure_map',
data=json.dumps(publish_treasure_map_params))
assert response.status_code == 200
response_data = json.loads(response.data)
assert response_data['result']['published']
# try getting the random treasure map now
get_treasure_map_params = {
@ -132,3 +134,52 @@ def test_publish_and_get_treasure_map(federated_porter_web_controller,
assert response.status_code == 200
response_data = json.loads(response.data)
assert response_data['result']['treasure_map'] == b64encode(bytes(enacted_federated_policy.treasure_map)).decode()
def test_endpoints_basic_auth(federated_porter_basic_auth_web_controller):
# /get_ursulas
quantity = 4
duration = 2 # irrelevant for federated (but required)
get_ursulas_params = {
'quantity': quantity,
'duration_periods': duration, # irrelevant for federated (but required)
}
response = federated_porter_basic_auth_web_controller.get('/get_ursulas', data=json.dumps(get_ursulas_params))
assert response.status_code == 401 # user unauthorized
random_bob_encrypting_key = UmbralPublicKey.from_bytes(
bytes.fromhex("026d1f4ce5b2474e0dae499d6737a8d987ed3c9ab1a55e00f57ad2d8e81fe9e9ac"))
random_treasure_map_id = "f6ec73c93084ce91d5542a4ba6070071f5565112fe19b26ae9c960f9d658903a" # federated is 32 bytes
random_treasure_map = b64decode("Qld7S8sbKFCv2B8KxfJo4oxiTOjZ4VPyqTK5K1xK6DND6TbLg2hvlGaMV69aiiC5QfadB82w/5q1"
"Sw+SNFHN2esWgAbs38QuUVUGCzDoWzQAAAGIAuhw12ZiPMNV8LaeWV8uUN+au2HGOjWilqtKsaP9f"
"mnLAzFiTUAu9/VCxOLOQE88BPoWk1H7OxRLDEhnBVYyflpifKbOYItwLLTtWYVFRY90LtNSAzS8d3v"
"NH4c3SHSZwYsCKY+5LvJ68GD0CqhydSxCcGckh0unttHrYGSOQsURUI4AAAEBsSMlukjA1WyYA+Fouq"
"kuRtk8bVHcYLqRUkK2n6dShEUGMuY1SzcAbBINvJYmQp+hhzK5m47AzCl463emXepYZQC/evytktG7y"
"Xxd3k8Ak+Qr7T4+G2VgJl4YrafTpIT6wowd+8u/SMSrrf/M41OhtLeBC4uDKjO3rYBQfVLTpEAgiX/9"
"jxB80RtNMeCwgcieviAR5tlw2IlxVTEhxXbFeopcOZmfEuhVWqgBUfIakqsNCXkkubV0XS2l5G1vtTM8"
"oNML0rP8PyKd4+0M5N6P/EQqFkHH93LCDD0IQBq9usm3MoJp0eT8N3m5gprI05drDh2xe/W6qnQfw3YXn"
"jdvf2A=")
# /get_treasure_map
get_treasure_map_params = {
'treasure_map_id': random_treasure_map_id,
'bob_encrypting_key': random_bob_encrypting_key.hex()
}
response = federated_porter_basic_auth_web_controller.get('/get_treasure_map', data=json.dumps(get_treasure_map_params))
assert response.status_code == 401 # user not authenticated
# /publish_treasure_map
publish_treasure_map_params = {
'treasure_map': b64encode(bytes(random_treasure_map)).decode(),
'bob_encrypting_key': random_bob_encrypting_key.hex()
}
response = federated_porter_basic_auth_web_controller.post('/publish_treasure_map',
data=json.dumps(publish_treasure_map_params))
assert response.status_code == 401 # user not authenticated
# try get_ursulas with authentication
credentials = b64encode(b"admin:admin").decode('utf-8')
response = federated_porter_basic_auth_web_controller.get('/get_ursulas',
data=json.dumps(get_ursulas_params),
headers={"Authorization": f"Basic {credentials}"})
assert response.status_code == 200 # success