709 lines
28 KiB
Python
709 lines
28 KiB
Python
##########################################################################
|
|
#
|
|
# pgAdmin 4 - PostgreSQL Tools
|
|
#
|
|
# Copyright (C) 2013 - 2026, The pgAdmin Development Team
|
|
# This software is released under the PostgreSQL Licence
|
|
#
|
|
##########################################################################
|
|
|
|
import config as app_config
|
|
from pgadmin.utils.route import BaseTestGenerator
|
|
from regression.python_test_utils import test_utils as utils
|
|
from pgadmin.authenticate.registry import AuthSourceRegistry
|
|
from unittest.mock import patch, MagicMock, mock_open
|
|
from pgadmin.utils.constants import OAUTH2, INTERNAL
|
|
from flask import current_app, redirect
|
|
|
|
|
|
class Oauth2LoginMockTestCase(BaseTestGenerator):
|
|
"""
|
|
This class checks oauth2 login functionality by mocking
|
|
External Oauth2 Authentication.
|
|
"""
|
|
|
|
scenarios = [
|
|
('Oauth2 External Authentication', dict(
|
|
oauth2_provider='github',
|
|
kind='external_redirect',
|
|
profile={},
|
|
id_token_claims=None,
|
|
)),
|
|
('Oauth2 Authentication', dict(
|
|
oauth2_provider='github',
|
|
kind='login_success',
|
|
profile={'email': 'oauth2@gmail.com'},
|
|
id_token_claims=None,
|
|
)),
|
|
('Oauth2 Additional Claims Authentication', dict(
|
|
oauth2_provider='auth-with-additional-claim-check',
|
|
kind='login_success',
|
|
profile={'email': 'oauth2@gmail.com', 'wids': ['789']},
|
|
id_token_claims=None,
|
|
)),
|
|
('Oauth2 PKCE Support', dict(
|
|
oauth2_provider='keycloak-pkce',
|
|
kind='pkce',
|
|
profile={},
|
|
id_token_claims=None,
|
|
)),
|
|
('Oauth2 Public Client PKCE Registration', dict(
|
|
oauth2_provider='public-pkce',
|
|
kind='public_pkce_registration',
|
|
profile={},
|
|
id_token_claims=None,
|
|
)),
|
|
('Oauth2 Confidential Client Registration', dict(
|
|
oauth2_provider='github',
|
|
kind='confidential_registration',
|
|
profile={},
|
|
id_token_claims=None,
|
|
)),
|
|
('Oauth2 Invalid Public Client Config', dict(
|
|
oauth2_provider='invalid-public',
|
|
kind='invalid_public_no_pkce',
|
|
profile={},
|
|
id_token_claims=None,
|
|
)),
|
|
('Oauth2 Workload Identity Registration', dict(
|
|
oauth2_provider='workload-identity',
|
|
kind='workload_identity_registration',
|
|
profile={},
|
|
id_token_claims=None,
|
|
)),
|
|
('Oauth2 Workload Identity Client Assertion', dict(
|
|
oauth2_provider='workload-identity',
|
|
kind='workload_identity_client_assertion',
|
|
profile={},
|
|
id_token_claims=None,
|
|
)),
|
|
('Oauth2 Workload Identity Missing Token File', dict(
|
|
oauth2_provider='workload-identity',
|
|
kind='workload_identity_missing_token_file',
|
|
profile={},
|
|
id_token_claims=None,
|
|
)),
|
|
('OIDC Uses ID Token Claims', dict(
|
|
oauth2_provider='oidc-basic',
|
|
kind='login_success',
|
|
profile={},
|
|
id_token_claims={'email': 'oidc@example.com', 'sub': 'abc'},
|
|
)),
|
|
('OIDC Falls Back To Profile Email', dict(
|
|
oauth2_provider='oidc-basic',
|
|
kind='login_success',
|
|
profile={'email': 'fallback@example.com'},
|
|
id_token_claims={'sub': 'abc'},
|
|
)),
|
|
('OIDC Username Claim Precedence', dict(
|
|
oauth2_provider='oidc-username-claim',
|
|
kind='login_success',
|
|
profile={'email': 'email@example.com'},
|
|
id_token_claims={'preferred_username': 'preferred-user'},
|
|
)),
|
|
('OIDC Additional Claims Via ID Token', dict(
|
|
oauth2_provider='oidc-additional-claims',
|
|
kind='login_success',
|
|
profile={'email': 'claims@example.com'},
|
|
id_token_claims={'groups': ['group-a']},
|
|
)),
|
|
('OIDC Additional Claims Rejected', dict(
|
|
oauth2_provider='oidc-additional-claims',
|
|
kind='login_failure',
|
|
profile={'email': 'claims@example.com'},
|
|
id_token_claims={'groups': ['group-b']},
|
|
)),
|
|
('OIDC get_user_profile Skips Userinfo', dict(
|
|
oauth2_provider='oidc-basic',
|
|
kind='oidc_get_user_profile_skip',
|
|
profile={},
|
|
id_token_claims=None,
|
|
)),
|
|
('OIDC get_user_profile Calls Userinfo', dict(
|
|
oauth2_provider='oidc-basic',
|
|
kind='oidc_get_user_profile_call',
|
|
profile={},
|
|
id_token_claims=None,
|
|
)),
|
|
]
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
"""Logout the test client as we are testing OAuth2 login scenarios."""
|
|
cls.tester.logout()
|
|
|
|
@staticmethod
|
|
def _get_register_kwargs(mock_register, provider_name):
|
|
for _args, _kwargs in mock_register.call_args_list:
|
|
if _kwargs.get('name') == provider_name:
|
|
return _kwargs
|
|
raise AssertionError(
|
|
f'OAuth.register was not called for provider: {provider_name}'
|
|
)
|
|
|
|
def setUp(self):
|
|
app_config.AUTHENTICATION_SOURCES = [OAUTH2]
|
|
self.app.PGADMIN_EXTERNAL_AUTH_SOURCE = OAUTH2
|
|
# Ensure OAuth2 users can be created during tests.
|
|
app_config.OAUTH2_AUTO_CREATE_USER = True
|
|
app_config.OAUTH2_CONFIG = [
|
|
{
|
|
'OAUTH2_NAME': 'github',
|
|
'OAUTH2_DISPLAY_NAME': 'Github',
|
|
'OAUTH2_CLIENT_ID': 'testclientid',
|
|
'OAUTH2_CLIENT_SECRET': 'testclientsec',
|
|
'OAUTH2_TOKEN_URL':
|
|
'https://github.com/login/oauth/access_token',
|
|
'OAUTH2_AUTHORIZATION_URL':
|
|
'https://github.com/login/oauth/authorize',
|
|
'OAUTH2_API_BASE_URL': 'https://api.github.com/',
|
|
'OAUTH2_USERINFO_ENDPOINT': 'user',
|
|
'OAUTH2_SCOPE': 'openid email profile',
|
|
'OAUTH2_ICON': 'fa-github',
|
|
'OAUTH2_BUTTON_COLOR': '#3253a8',
|
|
},
|
|
{
|
|
'OAUTH2_NAME': 'auth-with-additional-claim-check',
|
|
'OAUTH2_DISPLAY_NAME': 'Additional Authorization',
|
|
'OAUTH2_CLIENT_ID': 'testclientid',
|
|
'OAUTH2_CLIENT_SECRET': 'testclientsec',
|
|
'OAUTH2_TOKEN_URL':
|
|
'https://dummy.com/123/oauth2/v2.0/token',
|
|
'OAUTH2_AUTHORIZATION_URL':
|
|
'https://dummy.com/123/oauth2/v2.0/authorize',
|
|
'OAUTH2_API_BASE_URL': 'https://graph.dummy.com/v1.0/',
|
|
'OAUTH2_USERINFO_ENDPOINT': 'me',
|
|
'OAUTH2_SCOPE': 'openid email profile',
|
|
'OAUTH2_ICON': 'briefcase',
|
|
'OAUTH2_BUTTON_COLOR': '#0000ff',
|
|
'OAUTH2_ADDITIONAL_CLAIMS': {
|
|
'groups': ['123', '456'],
|
|
'wids': ['789']
|
|
}
|
|
},
|
|
{
|
|
'OAUTH2_NAME': 'keycloak-pkce',
|
|
'OAUTH2_DISPLAY_NAME': 'Keycloak with PKCE',
|
|
'OAUTH2_CLIENT_ID': 'testclientid',
|
|
'OAUTH2_CLIENT_SECRET': 'testclientsec',
|
|
'OAUTH2_TOKEN_URL':
|
|
'https://keycloak.org/auth/realms/TEST-REALM/protocol/'
|
|
'openid-connect/token',
|
|
'OAUTH2_AUTHORIZATION_URL':
|
|
'https://keycloak.org/auth/realms/TEST-REALM/protocol/'
|
|
'openid-connect/auth',
|
|
'OAUTH2_API_BASE_URL':
|
|
'https://keycloak.org/auth/realms/TEST-REALM',
|
|
'OAUTH2_USERINFO_ENDPOINT': 'user',
|
|
'OAUTH2_SCOPE': 'openid email profile',
|
|
'OAUTH2_SSL_CERT_VERIFICATION': True,
|
|
'OAUTH2_ICON': 'fa-black-tie',
|
|
'OAUTH2_BUTTON_COLOR': '#3253a8',
|
|
'OAUTH2_CHALLENGE_METHOD': 'S256',
|
|
'OAUTH2_RESPONSE_TYPE': 'code',
|
|
},
|
|
{
|
|
'OAUTH2_NAME': 'public-pkce',
|
|
'OAUTH2_DISPLAY_NAME': 'Public Client (PKCE)',
|
|
'OAUTH2_CLIENT_ID': 'testclientid',
|
|
# No secret: public client
|
|
'OAUTH2_CLIENT_SECRET': '',
|
|
'OAUTH2_TOKEN_URL': 'https://public.example/token',
|
|
'OAUTH2_AUTHORIZATION_URL': 'https://public.example/auth',
|
|
'OAUTH2_API_BASE_URL': 'https://public.example/',
|
|
'OAUTH2_USERINFO_ENDPOINT': 'userinfo',
|
|
'OAUTH2_SCOPE': 'openid email profile',
|
|
'OAUTH2_SSL_CERT_VERIFICATION': True,
|
|
'OAUTH2_CHALLENGE_METHOD': 'S256',
|
|
'OAUTH2_RESPONSE_TYPE': 'code',
|
|
},
|
|
{
|
|
'OAUTH2_NAME': 'oidc-basic',
|
|
'OAUTH2_DISPLAY_NAME': 'OIDC Basic',
|
|
'OAUTH2_CLIENT_ID': 'testclientid',
|
|
'OAUTH2_CLIENT_SECRET': 'testclientsec',
|
|
'OAUTH2_TOKEN_URL': 'https://oidc.example/token',
|
|
'OAUTH2_AUTHORIZATION_URL': 'https://oidc.example/auth',
|
|
'OAUTH2_API_BASE_URL': 'https://oidc.example/',
|
|
'OAUTH2_USERINFO_ENDPOINT': 'userinfo',
|
|
'OAUTH2_SCOPE': 'openid email profile',
|
|
'OAUTH2_SERVER_METADATA_URL':
|
|
'https://oidc.example/.well-known/openid-configuration',
|
|
},
|
|
{
|
|
'OAUTH2_NAME': 'oidc-username-claim',
|
|
'OAUTH2_DISPLAY_NAME': 'OIDC Username Claim',
|
|
'OAUTH2_CLIENT_ID': 'testclientid',
|
|
'OAUTH2_CLIENT_SECRET': 'testclientsec',
|
|
'OAUTH2_TOKEN_URL': 'https://oidc.example/token',
|
|
'OAUTH2_AUTHORIZATION_URL': 'https://oidc.example/auth',
|
|
'OAUTH2_API_BASE_URL': 'https://oidc.example/',
|
|
'OAUTH2_USERINFO_ENDPOINT': 'userinfo',
|
|
'OAUTH2_SCOPE': 'openid email profile',
|
|
'OAUTH2_SERVER_METADATA_URL':
|
|
'https://oidc.example/.well-known/openid-configuration',
|
|
'OAUTH2_USERNAME_CLAIM': 'preferred_username',
|
|
},
|
|
{
|
|
'OAUTH2_NAME': 'oidc-additional-claims',
|
|
'OAUTH2_DISPLAY_NAME': 'OIDC Additional Claims',
|
|
'OAUTH2_CLIENT_ID': 'testclientid',
|
|
'OAUTH2_CLIENT_SECRET': 'testclientsec',
|
|
'OAUTH2_TOKEN_URL': 'https://oidc.example/token',
|
|
'OAUTH2_AUTHORIZATION_URL': 'https://oidc.example/auth',
|
|
'OAUTH2_API_BASE_URL': 'https://oidc.example/',
|
|
'OAUTH2_USERINFO_ENDPOINT': 'userinfo',
|
|
'OAUTH2_SCOPE': 'openid email profile',
|
|
'OAUTH2_SERVER_METADATA_URL':
|
|
'https://oidc.example/.well-known/openid-configuration',
|
|
'OAUTH2_ADDITIONAL_CLAIMS': {
|
|
'groups': ['group-a']
|
|
}
|
|
}
|
|
]
|
|
|
|
def runTest(self):
|
|
"""This function checks oauth2 login functionality."""
|
|
if app_config.SERVER_MODE is False:
|
|
self.skipTest(
|
|
"Can not run Oauth2 Authentication in the Desktop mode."
|
|
)
|
|
|
|
self._reset_oauth2_state()
|
|
|
|
if self.kind == 'external_redirect':
|
|
self._test_external_authentication(self.oauth2_provider)
|
|
elif self.kind == 'pkce':
|
|
self._test_oauth2_authentication_with_pkce()
|
|
elif self.kind == 'public_pkce_registration':
|
|
self._test_public_client_pkce_registration()
|
|
elif self.kind == 'confidential_registration':
|
|
self._test_confidential_client_registration_unchanged()
|
|
elif self.kind == 'invalid_public_no_pkce':
|
|
self._test_public_client_missing_pkce_fails_fast()
|
|
elif self.kind == 'workload_identity_registration':
|
|
self._test_workload_identity_registration()
|
|
elif self.kind == 'workload_identity_client_assertion':
|
|
self._test_workload_identity_client_assertion()
|
|
elif self.kind == 'workload_identity_missing_token_file':
|
|
self._test_workload_identity_missing_token_file_fails_fast()
|
|
elif self.kind == 'login_success':
|
|
self._test_oauth2_login_success(
|
|
self.oauth2_provider, self.profile, self.id_token_claims
|
|
)
|
|
elif self.kind == 'login_failure':
|
|
self._test_oauth2_login_failure(
|
|
self.oauth2_provider, self.profile, self.id_token_claims
|
|
)
|
|
elif self.kind == 'oidc_get_user_profile_skip':
|
|
self._test_oidc_get_user_profile_skip_userinfo(
|
|
self.oauth2_provider
|
|
)
|
|
elif self.kind == 'oidc_get_user_profile_call':
|
|
self._test_oidc_get_user_profile_calls_userinfo(
|
|
self.oauth2_provider
|
|
)
|
|
else:
|
|
self.fail(f'Unknown test kind: {self.kind}')
|
|
|
|
def _reset_oauth2_state(self):
|
|
"""Reset singleton caches so each subTest gets a clean OAuth2 state."""
|
|
# Clear AuthSourceRegistry singleton instances.
|
|
AuthSourceRegistry._objects = dict()
|
|
|
|
# Clear per-app cache of instantiated auth sources.
|
|
with self.app.app_context():
|
|
cached = getattr(current_app, '_pgadmin_auth_sources', None)
|
|
if isinstance(cached, dict):
|
|
cached.clear()
|
|
else:
|
|
setattr(current_app, '_pgadmin_auth_sources', {})
|
|
|
|
# Clear OAuth2Authentication class-level caches.
|
|
from pgadmin.authenticate.oauth2 import OAuth2Authentication
|
|
OAuth2Authentication.oauth2_clients = {}
|
|
OAuth2Authentication.oauth2_config = {}
|
|
|
|
def _assert_oauth2_session_logged_in(self):
|
|
with self.tester.session_transaction() as sess:
|
|
asm = sess.get('auth_source_manager')
|
|
self.assertIsNotNone(asm)
|
|
self.assertEqual(asm.get('current_source'), OAUTH2)
|
|
|
|
def _assert_oauth2_session_not_logged_in(self):
|
|
with self.tester.session_transaction() as sess:
|
|
asm = sess.get('auth_source_manager')
|
|
self.assertTrue(asm is None or asm == {})
|
|
|
|
def _test_external_authentication(self, provider):
|
|
"""Ensure the user is redirected to an external URL."""
|
|
from pgadmin.authenticate.oauth2 import OAuth2Authentication
|
|
|
|
def _fake_authenticate(self, _form):
|
|
self.oauth2_current_client = provider
|
|
return False, redirect('https://example.com/')
|
|
|
|
with patch.object(
|
|
OAuth2Authentication, 'authenticate', new=_fake_authenticate
|
|
):
|
|
try:
|
|
self.tester.login(
|
|
email=None, password=None,
|
|
_follow_redirects=True,
|
|
headers=None,
|
|
extra_form_data=dict(oauth2_button=provider)
|
|
)
|
|
except Exception as e:
|
|
self.assertEqual(
|
|
'Following external redirects is not supported.',
|
|
str(e)
|
|
)
|
|
|
|
def _test_oauth2_login_success(
|
|
self, provider, profile, id_token_claims=None
|
|
):
|
|
from pgadmin.authenticate.oauth2 import OAuth2Authentication
|
|
|
|
def _fake_authenticate(self, _form):
|
|
self.oauth2_current_client = provider
|
|
# Important: AuthSourceManager may be constructed with a dict
|
|
# form for oauth2_button flows, so avoid returning a username.
|
|
return True, None
|
|
|
|
def _fake_get_user_profile(self):
|
|
if id_token_claims is not None:
|
|
from flask import session
|
|
session['oauth2_token'] = {
|
|
'access_token': 'test-access-token',
|
|
'id_token': 'mock.jwt.token',
|
|
'token_type': 'Bearer',
|
|
'userinfo': id_token_claims
|
|
}
|
|
return profile
|
|
|
|
with patch.object(
|
|
OAuth2Authentication, 'authenticate', new=_fake_authenticate
|
|
), patch.object(
|
|
OAuth2Authentication, 'get_user_profile',
|
|
new=_fake_get_user_profile
|
|
):
|
|
res = self.tester.login(
|
|
email=None, password=None,
|
|
_follow_redirects=True,
|
|
headers=None,
|
|
extra_form_data=dict(oauth2_button=provider)
|
|
)
|
|
self.assertEqual(res.status_code, 200)
|
|
self._assert_oauth2_session_logged_in()
|
|
|
|
def _test_oauth2_login_failure(
|
|
self, provider, profile, id_token_claims=None
|
|
):
|
|
from pgadmin.authenticate.oauth2 import OAuth2Authentication
|
|
|
|
def _fake_authenticate(self, _form):
|
|
self.oauth2_current_client = provider
|
|
return True, None
|
|
|
|
def _fake_get_user_profile(self):
|
|
if id_token_claims is not None:
|
|
from flask import session
|
|
session['oauth2_token'] = {
|
|
'access_token': 'test-access-token',
|
|
'id_token': 'mock.jwt.token',
|
|
'token_type': 'Bearer',
|
|
'userinfo': id_token_claims
|
|
}
|
|
return profile
|
|
|
|
with patch.object(
|
|
OAuth2Authentication, 'authenticate', new=_fake_authenticate
|
|
), patch.object(
|
|
OAuth2Authentication, 'get_user_profile',
|
|
new=_fake_get_user_profile
|
|
):
|
|
res = self.tester.login(
|
|
email=None, password=None,
|
|
_follow_redirects=True,
|
|
headers=None,
|
|
extra_form_data=dict(oauth2_button=provider)
|
|
)
|
|
self.assertEqual(res.status_code, 200)
|
|
self._assert_oauth2_session_not_logged_in()
|
|
|
|
def _test_oauth2_authentication_with_pkce(self):
|
|
"""
|
|
Ensure that when PKCE parameters are configured, they are passed
|
|
to the OAuth client registration as part of client_kwargs, and that
|
|
the default client_kwargs is correctly included.
|
|
"""
|
|
|
|
with patch(
|
|
'pgadmin.authenticate.oauth2.OAuth.register'
|
|
) as mock_register:
|
|
from pgadmin.authenticate.oauth2 import OAuth2Authentication
|
|
|
|
OAuth2Authentication()
|
|
|
|
kwargs = self._get_register_kwargs(mock_register, 'keycloak-pkce')
|
|
client_kwargs = kwargs.get('client_kwargs', {})
|
|
|
|
# Check that PKCE and default client_kwargs are included
|
|
self.assertEqual(
|
|
client_kwargs.get('code_challenge_method'), 'S256')
|
|
self.assertEqual(
|
|
client_kwargs.get('response_type'), 'code')
|
|
self.assertEqual(
|
|
client_kwargs.get('scope'), 'openid email profile')
|
|
self.assertEqual(
|
|
client_kwargs.get('verify'), True)
|
|
|
|
def _test_public_client_pkce_registration(self):
|
|
"""Public clients without a secret must set token auth to none."""
|
|
|
|
with patch(
|
|
'pgadmin.authenticate.oauth2.OAuth.register'
|
|
) as mock_register:
|
|
from pgadmin.authenticate.oauth2 import OAuth2Authentication
|
|
|
|
OAuth2Authentication()
|
|
|
|
kwargs = self._get_register_kwargs(mock_register, 'public-pkce')
|
|
|
|
self.assertEqual(kwargs.get('token_endpoint_auth_method'), 'none')
|
|
self.assertIsNone(kwargs.get('client_secret'))
|
|
|
|
client_kwargs = kwargs.get('client_kwargs', {})
|
|
self.assertEqual(
|
|
client_kwargs.get('code_challenge_method'), 'S256')
|
|
self.assertEqual(
|
|
client_kwargs.get('response_type'), 'code')
|
|
|
|
def _test_confidential_client_registration_unchanged(self):
|
|
"""
|
|
Confidential clients must preserve existing registration
|
|
behavior.
|
|
"""
|
|
|
|
with patch(
|
|
'pgadmin.authenticate.oauth2.OAuth.register'
|
|
) as mock_register:
|
|
from pgadmin.authenticate.oauth2 import OAuth2Authentication
|
|
|
|
OAuth2Authentication()
|
|
|
|
kwargs = self._get_register_kwargs(mock_register, 'github')
|
|
self.assertNotIn('token_endpoint_auth_method', kwargs)
|
|
self.assertEqual(kwargs.get('client_secret'), 'testclientsec')
|
|
|
|
def _test_public_client_missing_pkce_fails_fast(self):
|
|
"""Public client configuration without PKCE must raise an error."""
|
|
|
|
# Override config for this scenario only.
|
|
app_config.OAUTH2_CONFIG = [{
|
|
'OAUTH2_NAME': 'invalid-public',
|
|
'OAUTH2_DISPLAY_NAME': 'Invalid Public',
|
|
'OAUTH2_CLIENT_ID': 'testclientid',
|
|
'OAUTH2_CLIENT_SECRET': None,
|
|
'OAUTH2_TOKEN_URL': 'https://invalid.example/token',
|
|
'OAUTH2_AUTHORIZATION_URL': 'https://invalid.example/auth',
|
|
'OAUTH2_API_BASE_URL': 'https://invalid.example/',
|
|
'OAUTH2_USERINFO_ENDPOINT': 'userinfo',
|
|
'OAUTH2_SCOPE': 'openid email profile',
|
|
}]
|
|
|
|
with patch(
|
|
'pgadmin.authenticate.oauth2.OAuth.register'
|
|
) as mock_register:
|
|
from pgadmin.authenticate.oauth2 import OAuth2Authentication
|
|
|
|
with self.assertRaises(ValueError) as cm:
|
|
OAuth2Authentication()
|
|
|
|
self.assertIn('invalid-public', str(cm.exception))
|
|
self.assertIn('OAUTH2_CLIENT_SECRET', str(cm.exception))
|
|
self.assertIn('OAUTH2_CHALLENGE_METHOD', str(cm.exception))
|
|
self.assertIn('OAUTH2_RESPONSE_TYPE', str(cm.exception))
|
|
mock_register.assert_not_called()
|
|
|
|
def _test_oidc_get_user_profile_skip_userinfo(self, provider):
|
|
from pgadmin.authenticate.oauth2 import OAuth2Authentication
|
|
|
|
with self.app.test_request_context('/'):
|
|
oauth = OAuth2Authentication()
|
|
oauth.oauth2_current_client = provider
|
|
|
|
claims = {'email': 'oidc-skip@example.com', 'sub': 'abc'}
|
|
|
|
client = MagicMock()
|
|
client.authorize_access_token = MagicMock(return_value={
|
|
'access_token': 't',
|
|
'id_token': 'mock.jwt.token',
|
|
'token_type': 'Bearer',
|
|
'userinfo': claims
|
|
})
|
|
client.get = MagicMock(side_effect=AssertionError(
|
|
'userinfo endpoint should not be called'))
|
|
|
|
OAuth2Authentication.oauth2_clients[provider] = client
|
|
profile = oauth.get_user_profile()
|
|
self.assertEqual(profile.get('email'), 'oidc-skip@example.com')
|
|
client.get.assert_not_called()
|
|
|
|
def _test_oidc_get_user_profile_calls_userinfo(self, provider):
|
|
from pgadmin.authenticate.oauth2 import OAuth2Authentication
|
|
|
|
with self.app.test_request_context('/'):
|
|
oauth = OAuth2Authentication()
|
|
oauth.oauth2_current_client = provider
|
|
|
|
client = MagicMock()
|
|
client.authorize_access_token = MagicMock(return_value={
|
|
'access_token': 't',
|
|
'token_type': 'Bearer',
|
|
'userinfo': {}
|
|
})
|
|
|
|
resp = MagicMock()
|
|
resp.raise_for_status = MagicMock()
|
|
resp.json = MagicMock(
|
|
return_value={'email': 'userinfo@example.com'}
|
|
)
|
|
client.get = MagicMock(return_value=resp)
|
|
|
|
OAuth2Authentication.oauth2_clients[provider] = client
|
|
profile = oauth.get_user_profile()
|
|
self.assertEqual(profile.get('email'), 'userinfo@example.com')
|
|
client.get.assert_called_once()
|
|
|
|
def _test_workload_identity_registration(self):
|
|
"""Workload identity must register without client secret or PKCE."""
|
|
|
|
app_config.OAUTH2_CONFIG = [{
|
|
'OAUTH2_NAME': 'workload-identity',
|
|
'OAUTH2_DISPLAY_NAME': 'Workload Identity',
|
|
'OAUTH2_CLIENT_ID': 'testclientid',
|
|
'OAUTH2_CLIENT_SECRET': None,
|
|
'OAUTH2_CLIENT_AUTH_METHOD': 'workload_identity',
|
|
'OAUTH2_WORKLOAD_IDENTITY_TOKEN_FILE': (
|
|
'/var/run/secrets/tokens/oidc'
|
|
),
|
|
'OAUTH2_TOKEN_URL': 'https://entra.example/token',
|
|
'OAUTH2_AUTHORIZATION_URL': 'https://entra.example/auth',
|
|
'OAUTH2_SCOPE': 'openid email profile',
|
|
'OAUTH2_SERVER_METADATA_URL':
|
|
'https://entra.example/.well-known/openid-configuration',
|
|
}]
|
|
|
|
with patch(
|
|
'pgadmin.authenticate.oauth2.OAuth.register'
|
|
) as mock_register, patch(
|
|
'pgadmin.authenticate.oauth2.os.path.isfile', return_value=True
|
|
):
|
|
from pgadmin.authenticate.oauth2 import OAuth2Authentication
|
|
|
|
OAuth2Authentication()
|
|
|
|
kwargs = self._get_register_kwargs(
|
|
mock_register, 'workload-identity'
|
|
)
|
|
self.assertEqual(kwargs.get('token_endpoint_auth_method'), 'none')
|
|
self.assertIsNone(kwargs.get('client_secret'))
|
|
|
|
client_kwargs = kwargs.get('client_kwargs', {})
|
|
self.assertNotIn('code_challenge_method', client_kwargs)
|
|
self.assertNotIn('response_type', client_kwargs)
|
|
|
|
def _test_workload_identity_client_assertion(self):
|
|
"""Token exchange must include client_assertion fields."""
|
|
|
|
app_config.OAUTH2_CONFIG = [{
|
|
'OAUTH2_NAME': 'workload-identity',
|
|
'OAUTH2_DISPLAY_NAME': 'Workload Identity',
|
|
'OAUTH2_CLIENT_ID': 'testclientid',
|
|
'OAUTH2_CLIENT_SECRET': None,
|
|
'OAUTH2_CLIENT_AUTH_METHOD': 'workload_identity',
|
|
'OAUTH2_WORKLOAD_IDENTITY_TOKEN_FILE': (
|
|
'/var/run/secrets/tokens/oidc'
|
|
),
|
|
'OAUTH2_TOKEN_URL': 'https://entra.example/token',
|
|
'OAUTH2_AUTHORIZATION_URL': 'https://entra.example/auth',
|
|
'OAUTH2_SCOPE': 'openid email profile',
|
|
'OAUTH2_SERVER_METADATA_URL':
|
|
'https://entra.example/.well-known/openid-configuration',
|
|
}]
|
|
|
|
client = MagicMock()
|
|
client.authorize_access_token = MagicMock(return_value={
|
|
'access_token': 't',
|
|
'token_type': 'Bearer',
|
|
'userinfo': {'email': 'wi@example.com', 'sub': 'abc'}
|
|
})
|
|
|
|
with patch(
|
|
'pgadmin.authenticate.oauth2.OAuth.register', return_value=client
|
|
), patch(
|
|
'pgadmin.authenticate.oauth2.os.path.isfile', return_value=True
|
|
), patch(
|
|
'builtins.open', mock_open(read_data='projected.jwt.token\n')
|
|
):
|
|
from pgadmin.authenticate.oauth2 import OAuth2Authentication
|
|
|
|
with self.app.test_request_context('/'):
|
|
oauth = OAuth2Authentication()
|
|
oauth.oauth2_current_client = 'workload-identity'
|
|
|
|
profile = oauth.get_user_profile()
|
|
self.assertEqual(profile.get('email'), 'wi@example.com')
|
|
|
|
client.authorize_access_token.assert_called_once_with(
|
|
client_assertion_type=(
|
|
'urn:ietf:params:oauth:client-assertion-type:jwt-bearer'
|
|
),
|
|
client_assertion='projected.jwt.token'
|
|
)
|
|
|
|
def _test_workload_identity_missing_token_file_fails_fast(self):
|
|
"""Workload identity must fail fast if token file missing."""
|
|
|
|
app_config.OAUTH2_CONFIG = [{
|
|
'OAUTH2_NAME': 'workload-identity',
|
|
'OAUTH2_DISPLAY_NAME': 'Workload Identity',
|
|
'OAUTH2_CLIENT_ID': 'testclientid',
|
|
'OAUTH2_CLIENT_SECRET': None,
|
|
'OAUTH2_CLIENT_AUTH_METHOD': 'workload_identity',
|
|
'OAUTH2_WORKLOAD_IDENTITY_TOKEN_FILE': '/does/not/exist',
|
|
'OAUTH2_TOKEN_URL': 'https://entra.example/token',
|
|
'OAUTH2_AUTHORIZATION_URL': 'https://entra.example/auth',
|
|
'OAUTH2_SCOPE': 'openid email profile',
|
|
}]
|
|
|
|
with patch(
|
|
'pgadmin.authenticate.oauth2.OAuth.register'
|
|
) as mock_register, patch(
|
|
'pgadmin.authenticate.oauth2.os.path.isfile', return_value=False
|
|
):
|
|
from pgadmin.authenticate.oauth2 import OAuth2Authentication
|
|
|
|
with self.assertRaises(ValueError) as cm:
|
|
OAuth2Authentication()
|
|
|
|
self.assertIn('workload_identity', str(cm.exception))
|
|
self.assertIn('OAUTH2_WORKLOAD_IDENTITY_TOKEN_FILE',
|
|
str(cm.exception))
|
|
mock_register.assert_not_called()
|
|
|
|
def tearDown(self):
|
|
self.tester.logout()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
"""
|
|
We need to again login the test client as soon as test scenarios
|
|
finishes.
|
|
"""
|
|
cls.tester.logout()
|
|
app_config.AUTHENTICATION_SOURCES = [INTERNAL]
|
|
app_config.PGADMIN_EXTERNAL_AUTH_SOURCE = INTERNAL
|
|
utils.login_tester_account(cls.tester)
|