########################################################################## # # pgAdmin 4 - PostgreSQL Tools # # Copyright (C) 2013 - 2021, The pgAdmin Development Team # This software is released under the PostgreSQL Licence # ########################################################################## """A blueprint module implementing the Oauth2 authentication.""" import config from authlib.integrations.flask_client import OAuth from flask import current_app, url_for, session, request,\ redirect, Flask, flash from flask_babelex import gettext from flask_security import login_user, current_user from flask_security.utils import get_post_logout_redirect, \ get_post_login_redirect, logout_user from pgadmin.authenticate.internal import BaseAuthentication from pgadmin.model import User from pgadmin.tools.user_management import create_user from pgadmin.utils.constants import OAUTH2 from pgadmin.utils import PgAdminModule from pgadmin.utils.csrf import pgCSRFProtect from pgadmin.model import db OAUTH2_LOGOUT = 'oauth2.logout' OAUTH2_AUTHORIZE = 'oauth2.authorize' class Oauth2Module(PgAdminModule): def register(self, app, options, first_registration=False): # Do not look for the sub_modules, # instead call blueprint.register(...) directly super(PgAdminModule, self).register(app, options, first_registration) def get_exposed_url_endpoints(self): return [OAUTH2_AUTHORIZE, OAUTH2_LOGOUT] def init_app(app): MODULE_NAME = 'oauth2' blueprint = Oauth2Module(MODULE_NAME, __name__, static_url_path='') @blueprint.route('/authorize', endpoint="authorize", methods=['GET', 'POST']) @pgCSRFProtect.exempt def oauth_authorize(): auth_obj = session['auth_obj'] auth_obj.set_current_source(auth_obj.source.get_source_name()) status, msg = auth_obj.login() if status: session['auth_source_manager'] = auth_obj.as_dict() if 'auth_obj' in session: session.pop('auth_obj') return redirect(get_post_login_redirect()) if 'auth_obj' in session: session.pop('auth_obj') logout_user() flash(msg, 'danger') return redirect(get_post_login_redirect()) @blueprint.route('/logout', endpoint="logout", methods=['GET', 'POST']) @pgCSRFProtect.exempt def oauth_logout(): if not current_user.is_authenticated: return redirect(get_post_logout_redirect()) for key in list(session.keys()): session.pop(key) logout_user() return redirect(get_post_logout_redirect()) app.register_blueprint(blueprint) app.login_manager.logout_view = OAUTH2_LOGOUT class OAuth2Authentication(BaseAuthentication): """OAuth Authentication Class""" LOGOUT_VIEW = OAUTH2_LOGOUT oauth_obj = OAuth(Flask(__name__)) oauth2_clients = {} oauth2_config = {} def __init__(self): for oauth2_config in config.OAUTH2_CONFIG: OAuth2Authentication.oauth2_config[ oauth2_config['OAUTH2_NAME']] = oauth2_config OAuth2Authentication.oauth2_clients[ oauth2_config['OAUTH2_NAME'] ] = OAuth2Authentication.oauth_obj.register( name=oauth2_config['OAUTH2_NAME'], client_id=oauth2_config['OAUTH2_CLIENT_ID'], client_secret=oauth2_config['OAUTH2_CLIENT_SECRET'], access_token_url=oauth2_config['OAUTH2_TOKEN_URL'], authorize_url=oauth2_config['OAUTH2_AUTHORIZATION_URL'], api_base_url=oauth2_config['OAUTH2_API_BASE_URL'], client_kwargs={'scope': oauth2_config.get( 'OAUTH2_SCOPE', 'email profile')}, ) def get_source_name(self): return OAUTH2 def get_friendly_name(self): return self.oauth2_config[self.oauth2_current_client]['OAUTH2_NAME'] def validate(self, form): return True def login(self, form): profile = self.get_user_profile() if 'email' not in profile or not profile['email']: current_app.logger.exception( "An email id is required to login into pgAdmin. " "Please update your Oauth2 profile." ) return False, gettext( "An email id is required to login into pgAdmin. " "Please update your Oauth2 profile.") user, msg = self.__auto_create_user(profile) if user: user = db.session.query(User).filter_by( username=profile['email'], auth_source=OAUTH2).first() current_app.login_manager.logout_view = \ OAuth2Authentication.LOGOUT_VIEW return login_user(user), None return False, msg def get_user_profile(self): session['oauth2_token'] = self.oauth2_clients[ self.oauth2_current_client].authorize_access_token() session['pass_enc_key'] = session['oauth2_token']['access_token'] resp = self.oauth2_clients[self.oauth2_current_client].get( self.oauth2_config[ self.oauth2_current_client]['OAUTH2_USERINFO_ENDPOINT'], token=session['oauth2_token'] ) resp.raise_for_status() return resp.json() def authenticate(self, form): self.oauth2_current_client = request.form['oauth2_button'] redirect_url = url_for(OAUTH2_AUTHORIZE, _external=True) if self.oauth2_current_client not in self.oauth2_clients: return False, gettext( "Please set the configuration parameters properly.") return False, self.oauth2_clients[ self.oauth2_current_client].authorize_redirect(redirect_url) def __auto_create_user(self, resp): if config.OAUTH2_AUTO_CREATE_USER: user = User.query.filter_by(username=resp['email'], auth_source=OAUTH2).first() if not user: return create_user({ 'username': resp['email'], 'email': resp['email'], 'role': 2, 'active': True, 'auth_source': OAUTH2 }) return True, {'username': resp['email']}