"""Test config flow.""" from unittest.mock import patch import pytest import voluptuous as vol import yaml from homeassistant import config as hass_config, config_entries, data_entry_flow from homeassistant.components import mqtt from homeassistant.components.hassio import HassioServiceInfo from homeassistant.core import HomeAssistant from homeassistant.setup import async_setup_component from tests.common import MockConfigEntry @pytest.fixture(autouse=True) def mock_finish_setup(): """Mock out the finish setup method.""" with patch( "homeassistant.components.mqtt.MQTT.async_connect", return_value=True ) as mock_finish: yield mock_finish @pytest.fixture def mock_try_connection(): """Mock the try connection method.""" with patch("homeassistant.components.mqtt.config_flow.try_connection") as mock_try: yield mock_try @pytest.fixture def mock_try_connection_success(): """Mock the try connection method with success.""" _mid = 1 def get_mid(): nonlocal _mid _mid += 1 return _mid def loop_start(): """Simulate connect on loop start.""" mock_client().on_connect(mock_client, None, None, 0) def _subscribe(topic, qos=0): mid = get_mid() mock_client().on_subscribe(mock_client, 0, mid) return (0, mid) def _unsubscribe(topic): mid = get_mid() mock_client().on_unsubscribe(mock_client, 0, mid) return (0, mid) with patch("paho.mqtt.client.Client") as mock_client: mock_client().loop_start = loop_start mock_client().subscribe = _subscribe mock_client().unsubscribe = _unsubscribe yield mock_client() @pytest.fixture def mock_try_connection_time_out(): """Mock the try connection method with a time out.""" # Patch prevent waiting 5 sec for a timeout with patch("paho.mqtt.client.Client") as mock_client, patch( "homeassistant.components.mqtt.config_flow.MQTT_TIMEOUT", 0 ): mock_client().loop_start = lambda *args: 1 yield mock_client() async def test_user_connection_works( hass, mock_try_connection, mock_finish_setup, mqtt_client_mock ): """Test we can finish a config flow.""" mock_try_connection.return_value = True result = await hass.config_entries.flow.async_init( "mqtt", context={"source": config_entries.SOURCE_USER} ) assert result["type"] == "form" result = await hass.config_entries.flow.async_configure( result["flow_id"], {"broker": "127.0.0.1"} ) assert result["type"] == "create_entry" assert result["result"].data == { "broker": "127.0.0.1", "port": 1883, "discovery": True, } # Check we tried the connection assert len(mock_try_connection.mock_calls) == 1 # Check config entry got setup assert len(mock_finish_setup.mock_calls) == 1 async def test_user_connection_fails( hass, mock_try_connection_time_out, mock_finish_setup ): """Test if connection cannot be made.""" result = await hass.config_entries.flow.async_init( "mqtt", context={"source": config_entries.SOURCE_USER} ) assert result["type"] == "form" result = await hass.config_entries.flow.async_configure( result["flow_id"], {"broker": "127.0.0.1"} ) assert result["type"] == "form" assert result["errors"]["base"] == "cannot_connect" # Check we tried the connection assert len(mock_try_connection_time_out.mock_calls) # Check config entry did not setup assert len(mock_finish_setup.mock_calls) == 0 async def test_manual_config_starts_discovery_flow( hass, mock_try_connection, mock_finish_setup, mqtt_client_mock ): """Test manual config initiates a discovery flow.""" # No flows in progress assert hass.config_entries.flow.async_progress() == [] # MQTT config present in yaml config assert await async_setup_component(hass, "mqtt", {"mqtt": {}}) await hass.async_block_till_done() assert len(mock_finish_setup.mock_calls) == 0 # There should now be a discovery flow flows = hass.config_entries.flow.async_progress() assert len(flows) == 1 assert flows[0]["context"]["source"] == "integration_discovery" assert flows[0]["handler"] == "mqtt" assert flows[0]["step_id"] == "broker" async def test_manual_config_set( hass, mock_try_connection, mock_finish_setup, mqtt_client_mock ): """Test manual config does not create an entry, and entry can be setup late.""" # MQTT config present in yaml config assert await async_setup_component(hass, "mqtt", {"mqtt": {"broker": "bla"}}) await hass.async_block_till_done() assert len(mock_finish_setup.mock_calls) == 0 mock_try_connection.return_value = True # Start config flow result = await hass.config_entries.flow.async_init( "mqtt", context={"source": config_entries.SOURCE_USER} ) assert result["type"] == "form" result = await hass.config_entries.flow.async_configure( result["flow_id"], {"broker": "127.0.0.1"} ) assert result["type"] == "create_entry" assert result["result"].data == { "broker": "127.0.0.1", "port": 1883, "discovery": True, } # Check we tried the connection, with precedence for config entry settings mock_try_connection.assert_called_once_with(hass, "127.0.0.1", 1883, None, None) # Check config entry got setup assert len(mock_finish_setup.mock_calls) == 1 async def test_user_single_instance(hass): """Test we only allow a single config flow.""" MockConfigEntry(domain="mqtt").add_to_hass(hass) result = await hass.config_entries.flow.async_init( "mqtt", context={"source": config_entries.SOURCE_USER} ) assert result["type"] == "abort" assert result["reason"] == "single_instance_allowed" async def test_hassio_already_configured(hass): """Test we only allow a single config flow.""" MockConfigEntry(domain="mqtt").add_to_hass(hass) result = await hass.config_entries.flow.async_init( "mqtt", context={"source": config_entries.SOURCE_HASSIO} ) assert result["type"] == "abort" assert result["reason"] == "already_configured" async def test_hassio_ignored(hass: HomeAssistant) -> None: """Test we supervisor discovered instance can be ignored.""" MockConfigEntry( domain=mqtt.DOMAIN, source=config_entries.SOURCE_IGNORE ).add_to_hass(hass) result = await hass.config_entries.flow.async_init( mqtt.DOMAIN, data=HassioServiceInfo( config={ "addon": "Mosquitto", "host": "mock-mosquitto", "port": "1883", "protocol": "3.1.1", } ), context={"source": config_entries.SOURCE_HASSIO}, ) assert result assert result.get("type") == data_entry_flow.RESULT_TYPE_ABORT assert result.get("reason") == "already_configured" async def test_hassio_confirm(hass, mock_try_connection_success, mock_finish_setup): """Test we can finish a config flow.""" mock_try_connection.return_value = True result = await hass.config_entries.flow.async_init( "mqtt", data=HassioServiceInfo( config={ "addon": "Mock Addon", "host": "mock-broker", "port": 1883, "username": "mock-user", "password": "mock-pass", "protocol": "3.1.1", # Set by the addon's discovery, ignored by HA "ssl": False, # Set by the addon's discovery, ignored by HA } ), context={"source": config_entries.SOURCE_HASSIO}, ) assert result["type"] == "form" assert result["step_id"] == "hassio_confirm" assert result["description_placeholders"] == {"addon": "Mock Addon"} mock_try_connection_success.reset_mock() result = await hass.config_entries.flow.async_configure( result["flow_id"], {"discovery": True} ) assert result["type"] == "create_entry" assert result["result"].data == { "broker": "mock-broker", "port": 1883, "username": "mock-user", "password": "mock-pass", "discovery": True, } # Check we tried the connection assert len(mock_try_connection_success.mock_calls) # Check config entry got setup assert len(mock_finish_setup.mock_calls) == 1 async def test_option_flow(hass, mqtt_mock_entry_no_yaml_config, mock_try_connection): """Test config flow options.""" mqtt_mock = await mqtt_mock_entry_no_yaml_config() mock_try_connection.return_value = True config_entry = hass.config_entries.async_entries(mqtt.DOMAIN)[0] config_entry.data = { mqtt.CONF_BROKER: "test-broker", mqtt.CONF_PORT: 1234, } mqtt_mock.async_connect.reset_mock() result = await hass.config_entries.options.async_init(config_entry.entry_id) assert result["type"] == data_entry_flow.RESULT_TYPE_FORM assert result["step_id"] == "broker" result = await hass.config_entries.options.async_configure( result["flow_id"], user_input={ mqtt.CONF_BROKER: "another-broker", mqtt.CONF_PORT: 2345, mqtt.CONF_USERNAME: "user", mqtt.CONF_PASSWORD: "pass", }, ) assert result["type"] == data_entry_flow.RESULT_TYPE_FORM assert result["step_id"] == "options" await hass.async_block_till_done() assert mqtt_mock.async_connect.call_count == 0 result = await hass.config_entries.options.async_configure( result["flow_id"], user_input={ mqtt.CONF_DISCOVERY: True, "birth_enable": True, "birth_topic": "ha_state/online", "birth_payload": "online", "birth_qos": 1, "birth_retain": True, "will_enable": True, "will_topic": "ha_state/offline", "will_payload": "offline", "will_qos": 2, "will_retain": True, }, ) assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY assert result["data"] is None assert config_entry.data == { mqtt.CONF_BROKER: "another-broker", mqtt.CONF_PORT: 2345, mqtt.CONF_USERNAME: "user", mqtt.CONF_PASSWORD: "pass", mqtt.CONF_DISCOVERY: True, mqtt.CONF_BIRTH_MESSAGE: { mqtt.ATTR_TOPIC: "ha_state/online", mqtt.ATTR_PAYLOAD: "online", mqtt.ATTR_QOS: 1, mqtt.ATTR_RETAIN: True, }, mqtt.CONF_WILL_MESSAGE: { mqtt.ATTR_TOPIC: "ha_state/offline", mqtt.ATTR_PAYLOAD: "offline", mqtt.ATTR_QOS: 2, mqtt.ATTR_RETAIN: True, }, } await hass.async_block_till_done() assert mqtt_mock.async_connect.call_count == 1 async def test_disable_birth_will( hass, mqtt_mock_entry_no_yaml_config, mock_try_connection ): """Test disabling birth and will.""" mqtt_mock = await mqtt_mock_entry_no_yaml_config() mock_try_connection.return_value = True config_entry = hass.config_entries.async_entries(mqtt.DOMAIN)[0] config_entry.data = { mqtt.CONF_BROKER: "test-broker", mqtt.CONF_PORT: 1234, } mqtt_mock.async_connect.reset_mock() result = await hass.config_entries.options.async_init(config_entry.entry_id) assert result["type"] == data_entry_flow.RESULT_TYPE_FORM assert result["step_id"] == "broker" result = await hass.config_entries.options.async_configure( result["flow_id"], user_input={ mqtt.CONF_BROKER: "another-broker", mqtt.CONF_PORT: 2345, mqtt.CONF_USERNAME: "user", mqtt.CONF_PASSWORD: "pass", }, ) assert result["type"] == data_entry_flow.RESULT_TYPE_FORM assert result["step_id"] == "options" await hass.async_block_till_done() assert mqtt_mock.async_connect.call_count == 0 result = await hass.config_entries.options.async_configure( result["flow_id"], user_input={ mqtt.CONF_DISCOVERY: True, "birth_enable": False, "birth_topic": "ha_state/online", "birth_payload": "online", "birth_qos": 1, "birth_retain": True, "will_enable": False, "will_topic": "ha_state/offline", "will_payload": "offline", "will_qos": 2, "will_retain": True, }, ) assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY assert result["data"] is None assert config_entry.data == { mqtt.CONF_BROKER: "another-broker", mqtt.CONF_PORT: 2345, mqtt.CONF_USERNAME: "user", mqtt.CONF_PASSWORD: "pass", mqtt.CONF_DISCOVERY: True, mqtt.CONF_BIRTH_MESSAGE: {}, mqtt.CONF_WILL_MESSAGE: {}, } await hass.async_block_till_done() assert mqtt_mock.async_connect.call_count == 1 def get_default(schema, key): """Get default value for key in voluptuous schema.""" for k in schema.keys(): if k == key: if k.default == vol.UNDEFINED: return None return k.default() def get_suggested(schema, key): """Get suggested value for key in voluptuous schema.""" for k in schema.keys(): if k == key: if k.description is None or "suggested_value" not in k.description: return None return k.description["suggested_value"] async def test_option_flow_default_suggested_values( hass, mqtt_mock_entry_no_yaml_config, mock_try_connection_success ): """Test config flow options has default/suggested values.""" await mqtt_mock_entry_no_yaml_config() config_entry = hass.config_entries.async_entries(mqtt.DOMAIN)[0] config_entry.data = { mqtt.CONF_BROKER: "test-broker", mqtt.CONF_PORT: 1234, mqtt.CONF_USERNAME: "user", mqtt.CONF_PASSWORD: "pass", mqtt.CONF_DISCOVERY: True, mqtt.CONF_BIRTH_MESSAGE: { mqtt.ATTR_TOPIC: "ha_state/online", mqtt.ATTR_PAYLOAD: "online", mqtt.ATTR_QOS: 1, mqtt.ATTR_RETAIN: True, }, mqtt.CONF_WILL_MESSAGE: { mqtt.ATTR_TOPIC: "ha_state/offline", mqtt.ATTR_PAYLOAD: "offline", mqtt.ATTR_QOS: 2, mqtt.ATTR_RETAIN: False, }, } # Test default/suggested values from config result = await hass.config_entries.options.async_init(config_entry.entry_id) assert result["type"] == data_entry_flow.RESULT_TYPE_FORM assert result["step_id"] == "broker" defaults = { mqtt.CONF_BROKER: "test-broker", mqtt.CONF_PORT: 1234, } suggested = { mqtt.CONF_USERNAME: "user", mqtt.CONF_PASSWORD: "pass", } for k, v in defaults.items(): assert get_default(result["data_schema"].schema, k) == v for k, v in suggested.items(): assert get_suggested(result["data_schema"].schema, k) == v result = await hass.config_entries.options.async_configure( result["flow_id"], user_input={ mqtt.CONF_BROKER: "another-broker", mqtt.CONF_PORT: 2345, mqtt.CONF_USERNAME: "us3r", mqtt.CONF_PASSWORD: "p4ss", }, ) assert result["type"] == data_entry_flow.RESULT_TYPE_FORM assert result["step_id"] == "options" defaults = { mqtt.CONF_DISCOVERY: True, "birth_qos": 1, "birth_retain": True, "will_qos": 2, "will_retain": False, } suggested = { "birth_topic": "ha_state/online", "birth_payload": "online", "will_topic": "ha_state/offline", "will_payload": "offline", } for k, v in defaults.items(): assert get_default(result["data_schema"].schema, k) == v for k, v in suggested.items(): assert get_suggested(result["data_schema"].schema, k) == v result = await hass.config_entries.options.async_configure( result["flow_id"], user_input={ mqtt.CONF_DISCOVERY: False, "birth_topic": "ha_state/onl1ne", "birth_payload": "onl1ne", "birth_qos": 2, "birth_retain": False, "will_topic": "ha_state/offl1ne", "will_payload": "offl1ne", "will_qos": 1, "will_retain": True, }, ) assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY # Test updated default/suggested values from config result = await hass.config_entries.options.async_init(config_entry.entry_id) assert result["type"] == data_entry_flow.RESULT_TYPE_FORM assert result["step_id"] == "broker" defaults = { mqtt.CONF_BROKER: "another-broker", mqtt.CONF_PORT: 2345, } suggested = { mqtt.CONF_USERNAME: "us3r", mqtt.CONF_PASSWORD: "p4ss", } for k, v in defaults.items(): assert get_default(result["data_schema"].schema, k) == v for k, v in suggested.items(): assert get_suggested(result["data_schema"].schema, k) == v result = await hass.config_entries.options.async_configure( result["flow_id"], user_input={mqtt.CONF_BROKER: "another-broker", mqtt.CONF_PORT: 2345}, ) assert result["type"] == data_entry_flow.RESULT_TYPE_FORM assert result["step_id"] == "options" defaults = { mqtt.CONF_DISCOVERY: False, "birth_qos": 2, "birth_retain": False, "will_qos": 1, "will_retain": True, } suggested = { "birth_topic": "ha_state/onl1ne", "birth_payload": "onl1ne", "will_topic": "ha_state/offl1ne", "will_payload": "offl1ne", } for k, v in defaults.items(): assert get_default(result["data_schema"].schema, k) == v for k, v in suggested.items(): assert get_suggested(result["data_schema"].schema, k) == v result = await hass.config_entries.options.async_configure( result["flow_id"], user_input={ mqtt.CONF_DISCOVERY: True, "birth_topic": "ha_state/onl1ne", "birth_payload": "onl1ne", "birth_qos": 2, "birth_retain": False, "will_topic": "ha_state/offl1ne", "will_payload": "offl1ne", "will_qos": 1, "will_retain": True, }, ) assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY # Make sure all MQTT related jobs are done before ending the test await hass.async_block_till_done() async def test_options_user_connection_fails(hass, mock_try_connection_time_out): """Test if connection cannot be made.""" config_entry = MockConfigEntry(domain=mqtt.DOMAIN) config_entry.add_to_hass(hass) config_entry.data = { mqtt.CONF_BROKER: "test-broker", mqtt.CONF_PORT: 1234, } result = await hass.config_entries.options.async_init(config_entry.entry_id) assert result["type"] == "form" mock_try_connection_time_out.reset_mock() result = await hass.config_entries.options.async_configure( result["flow_id"], user_input={mqtt.CONF_BROKER: "bad-broker", mqtt.CONF_PORT: 2345}, ) assert result["type"] == "form" assert result["errors"]["base"] == "cannot_connect" # Check we tried the connection assert len(mock_try_connection_time_out.mock_calls) # Check config entry did not update assert config_entry.data == { mqtt.CONF_BROKER: "test-broker", mqtt.CONF_PORT: 1234, } async def test_options_bad_birth_message_fails(hass, mock_try_connection): """Test bad birth message.""" config_entry = MockConfigEntry(domain=mqtt.DOMAIN) config_entry.add_to_hass(hass) config_entry.data = { mqtt.CONF_BROKER: "test-broker", mqtt.CONF_PORT: 1234, } mock_try_connection.return_value = True result = await hass.config_entries.options.async_init(config_entry.entry_id) assert result["type"] == "form" result = await hass.config_entries.options.async_configure( result["flow_id"], user_input={mqtt.CONF_BROKER: "another-broker", mqtt.CONF_PORT: 2345}, ) assert result["type"] == "form" assert result["step_id"] == "options" result = await hass.config_entries.options.async_configure( result["flow_id"], user_input={"birth_topic": "ha_state/online/#"}, ) assert result["type"] == "form" assert result["errors"]["base"] == "bad_birth" # Check config entry did not update assert config_entry.data == { mqtt.CONF_BROKER: "test-broker", mqtt.CONF_PORT: 1234, } async def test_options_bad_will_message_fails(hass, mock_try_connection): """Test bad will message.""" config_entry = MockConfigEntry(domain=mqtt.DOMAIN) config_entry.add_to_hass(hass) config_entry.data = { mqtt.CONF_BROKER: "test-broker", mqtt.CONF_PORT: 1234, } mock_try_connection.return_value = True result = await hass.config_entries.options.async_init(config_entry.entry_id) assert result["type"] == "form" result = await hass.config_entries.options.async_configure( result["flow_id"], user_input={mqtt.CONF_BROKER: "another-broker", mqtt.CONF_PORT: 2345}, ) assert result["type"] == "form" assert result["step_id"] == "options" result = await hass.config_entries.options.async_configure( result["flow_id"], user_input={"will_topic": "ha_state/offline/#"}, ) assert result["type"] == "form" assert result["errors"]["base"] == "bad_will" # Check config entry did not update assert config_entry.data == { mqtt.CONF_BROKER: "test-broker", mqtt.CONF_PORT: 1234, } async def test_try_connection_with_advanced_parameters( hass, mock_try_connection_success, tmp_path ): """Test config flow with advanced parameters from config.""" # Mock certificate files certfile = tmp_path / "cert.pem" certfile.write_text("## mock certificate file ##") keyfile = tmp_path / "key.pem" keyfile.write_text("## mock key file ##") config = { "certificate": "auto", "tls_insecure": True, "client_cert": certfile, "client_key": keyfile, } new_yaml_config_file = tmp_path / "configuration.yaml" new_yaml_config = yaml.dump({mqtt.DOMAIN: config}) new_yaml_config_file.write_text(new_yaml_config) assert new_yaml_config_file.read_text() == new_yaml_config with patch.object(hass_config, "YAML_CONFIG_FILE", new_yaml_config_file): await async_setup_component(hass, mqtt.DOMAIN, {mqtt.DOMAIN: config}) await hass.async_block_till_done() config_entry = MockConfigEntry(domain=mqtt.DOMAIN) config_entry.add_to_hass(hass) config_entry.data = { mqtt.CONF_BROKER: "test-broker", mqtt.CONF_PORT: 1234, mqtt.CONF_USERNAME: "user", mqtt.CONF_PASSWORD: "pass", mqtt.CONF_DISCOVERY: True, mqtt.CONF_BIRTH_MESSAGE: { mqtt.ATTR_TOPIC: "ha_state/online", mqtt.ATTR_PAYLOAD: "online", mqtt.ATTR_QOS: 1, mqtt.ATTR_RETAIN: True, }, mqtt.CONF_WILL_MESSAGE: { mqtt.ATTR_TOPIC: "ha_state/offline", mqtt.ATTR_PAYLOAD: "offline", mqtt.ATTR_QOS: 2, mqtt.ATTR_RETAIN: False, }, } # Test default/suggested values from config result = await hass.config_entries.options.async_init(config_entry.entry_id) assert result["type"] == data_entry_flow.RESULT_TYPE_FORM assert result["step_id"] == "broker" defaults = { mqtt.CONF_BROKER: "test-broker", mqtt.CONF_PORT: 1234, } suggested = { mqtt.CONF_USERNAME: "user", mqtt.CONF_PASSWORD: "pass", } for k, v in defaults.items(): assert get_default(result["data_schema"].schema, k) == v for k, v in suggested.items(): assert get_suggested(result["data_schema"].schema, k) == v result = await hass.config_entries.options.async_configure( result["flow_id"], user_input={ mqtt.CONF_BROKER: "another-broker", mqtt.CONF_PORT: 2345, mqtt.CONF_USERNAME: "us3r", mqtt.CONF_PASSWORD: "p4ss", }, ) assert result["type"] == data_entry_flow.RESULT_TYPE_FORM assert result["step_id"] == "options" # check if the username and password was set from config flow and not from configuration.yaml assert mock_try_connection_success.username_pw_set.mock_calls[0][1] == ( "us3r", "p4ss", ) # check if tls_insecure_set is called assert mock_try_connection_success.tls_insecure_set.mock_calls[0][1] == (True,) # check if the certificate settings were set from configuration.yaml assert mock_try_connection_success.tls_set.mock_calls[0].kwargs[ "certfile" ] == str(certfile) assert mock_try_connection_success.tls_set.mock_calls[0].kwargs[ "keyfile" ] == str(keyfile)