"""Test Home Assistant package util methods.""" import asyncio import logging import os import sys from subprocess import PIPE from unittest.mock import MagicMock, call, patch import pkg_resources import pytest import homeassistant.util.package as package RESOURCE_DIR = os.path.abspath( os.path.join(os.path.dirname(__file__), '..', 'resources')) TEST_EXIST_REQ = 'pip>=7.0.0' TEST_NEW_REQ = 'pyhelloworld3==1.0.0' TEST_ZIP_REQ = 'file://{}#{}' \ .format(os.path.join(RESOURCE_DIR, 'pyhelloworld3.zip'), TEST_NEW_REQ) @pytest.fixture def mock_sys(): """Mock sys.""" with patch('homeassistant.util.package.sys', spec=object) as sys_mock: sys_mock.executable = 'python3' yield sys_mock @pytest.fixture def mock_exists(): """Mock package_loadable.""" with patch('homeassistant.util.package.package_loadable') as mock: mock.return_value = False yield mock @pytest.fixture def deps_dir(): """Return path to deps directory.""" return os.path.abspath('/deps_dir') @pytest.fixture def lib_dir(deps_dir): """Return path to lib directory.""" return os.path.join(deps_dir, 'lib_dir') @pytest.fixture def mock_popen(lib_dir): """Return a Popen mock.""" with patch('homeassistant.util.package.Popen') as popen_mock: popen_mock.return_value.communicate.return_value = ( bytes(lib_dir, 'utf-8'), b'error') popen_mock.return_value.returncode = 0 yield popen_mock @pytest.fixture def mock_env_copy(): """Mock os.environ.copy.""" with patch('homeassistant.util.package.os.environ.copy') as env_copy: env_copy.return_value = {} yield env_copy @pytest.fixture def mock_venv(): """Mock homeassistant.util.package.is_virtual_env.""" with patch('homeassistant.util.package.is_virtual_env') as mock: mock.return_value = True yield mock @asyncio.coroutine def mock_async_subprocess(): """Return an async Popen mock.""" async_popen = MagicMock() @asyncio.coroutine def communicate(input=None): """Communicate mock.""" stdout = bytes('/deps_dir/lib_dir', 'utf-8') return (stdout, None) async_popen.communicate = communicate return async_popen def test_install_existing_package(mock_exists, mock_popen): """Test an install attempt on an existing package.""" mock_exists.return_value = True assert package.install_package(TEST_EXIST_REQ) assert mock_exists.call_count == 1 assert mock_exists.call_args == call(TEST_EXIST_REQ) assert mock_popen.return_value.communicate.call_count == 0 def test_install(mock_sys, mock_exists, mock_popen, mock_env_copy, mock_venv): """Test an install attempt on a package that doesn't exist.""" env = mock_env_copy() assert package.install_package(TEST_NEW_REQ, False) assert mock_exists.call_count == 1 assert mock_popen.call_count == 1 assert ( mock_popen.call_args == call([ mock_sys.executable, '-m', 'pip', 'install', '--quiet', TEST_NEW_REQ ], stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env) ) assert mock_popen.return_value.communicate.call_count == 1 def test_install_upgrade( mock_sys, mock_exists, mock_popen, mock_env_copy, mock_venv): """Test an upgrade attempt on a package.""" env = mock_env_copy() assert package.install_package(TEST_NEW_REQ) assert mock_exists.call_count == 1 assert mock_popen.call_count == 1 assert ( mock_popen.call_args == call([ mock_sys.executable, '-m', 'pip', 'install', '--quiet', TEST_NEW_REQ, '--upgrade' ], stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env) ) assert mock_popen.return_value.communicate.call_count == 1 def test_install_target( mock_sys, mock_exists, mock_popen, mock_env_copy, mock_venv): """Test an install with a target.""" target = 'target_folder' env = mock_env_copy() env['PYTHONUSERBASE'] = os.path.abspath(target) mock_venv.return_value = False mock_sys.platform = 'linux' args = [ mock_sys.executable, '-m', 'pip', 'install', '--quiet', TEST_NEW_REQ, '--user', '--prefix='] assert package.install_package(TEST_NEW_REQ, False, target=target) assert mock_exists.call_count == 1 assert mock_popen.call_count == 1 assert ( mock_popen.call_args == call(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env) ) assert mock_popen.return_value.communicate.call_count == 1 def test_install_target_venv( mock_sys, mock_exists, mock_popen, mock_env_copy, mock_venv): """Test an install with a target in a virtual environment.""" target = 'target_folder' with pytest.raises(AssertionError): package.install_package(TEST_NEW_REQ, False, target=target) def test_install_error(caplog, mock_sys, mock_exists, mock_popen, mock_venv): """Test an install with a target.""" caplog.set_level(logging.WARNING) mock_popen.return_value.returncode = 1 assert not package.install_package(TEST_NEW_REQ) assert len(caplog.records) == 1 for record in caplog.records: assert record.levelname == 'ERROR' def test_install_constraint( mock_sys, mock_exists, mock_popen, mock_env_copy, mock_venv): """Test install with constraint file on not installed package.""" env = mock_env_copy() constraints = 'constraints_file.txt' assert package.install_package( TEST_NEW_REQ, False, constraints=constraints) assert mock_exists.call_count == 1 assert mock_popen.call_count == 1 assert ( mock_popen.call_args == call([ mock_sys.executable, '-m', 'pip', 'install', '--quiet', TEST_NEW_REQ, '--constraint', constraints ], stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env) ) assert mock_popen.return_value.communicate.call_count == 1 def test_check_package_global(): """Test for an installed package.""" installed_package = list(pkg_resources.working_set)[0].project_name assert package.package_loadable(installed_package) def test_check_package_zip(): """Test for an installed zip package.""" assert not package.package_loadable(TEST_ZIP_REQ) @asyncio.coroutine def test_async_get_user_site(mock_env_copy): """Test async get user site directory.""" deps_dir = '/deps_dir' env = mock_env_copy() env['PYTHONUSERBASE'] = os.path.abspath(deps_dir) args = [sys.executable, '-m', 'site', '--user-site'] with patch('homeassistant.util.package.asyncio.create_subprocess_exec', return_value=mock_async_subprocess()) as popen_mock: ret = yield from package.async_get_user_site(deps_dir) assert popen_mock.call_count == 1 assert popen_mock.call_args == call( *args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, env=env) assert ret == os.path.join(deps_dir, 'lib_dir') def test_package_loadable_installed_twice(): """Test that a package is loadable when installed twice. If a package is installed twice, only the first version will be imported. Test that package_loadable will only compare with the first package. """ v1 = pkg_resources.Distribution(project_name='hello', version='1.0.0') v2 = pkg_resources.Distribution(project_name='hello', version='2.0.0') with patch('pkg_resources.find_distributions', side_effect=[[v1]]): assert not package.package_loadable('hello==2.0.0') with patch('pkg_resources.find_distributions', side_effect=[[v1], [v2]]): assert not package.package_loadable('hello==2.0.0') with patch('pkg_resources.find_distributions', side_effect=[[v2], [v1]]): assert package.package_loadable('hello==2.0.0') with patch('pkg_resources.find_distributions', side_effect=[[v2]]): assert package.package_loadable('hello==2.0.0') with patch('pkg_resources.find_distributions', side_effect=[[v2]]): assert package.package_loadable('Hello==2.0.0')