Added working support for private storage (#16903)
* Fixed file corruption bugs in private storage code. * Restoring fixed test case. * Implemented test suite for utils/json.py * Added new unit test cases for util/json.py * Dixed formatting nags * Fixed more nags from the Hound * Added doc strings to some very short functions * Fixing lint's complains about my choice of parts of speach. Sigh. * Moved atomic save operations down into util/json.py so that all benefit. Added extra clean-up code to ensure that temporary files are removed in case of errors. Updated emulated_hue unit tests to avoid errors. * Apparently 'e' is not allows as a variable name for an exception...pull/17059/head^2
parent
1decba0052
commit
b0b3620b2b
|
@ -3,6 +3,8 @@ import logging
|
|||
from typing import Union, List, Dict
|
||||
|
||||
import json
|
||||
import os
|
||||
from os import O_WRONLY, O_CREAT, O_TRUNC
|
||||
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
|
||||
|
@ -44,10 +46,14 @@ def save_json(filename: str, data: Union[List, Dict],
|
|||
|
||||
Returns True on success.
|
||||
"""
|
||||
tmp_filename = filename + "__TEMP__"
|
||||
try:
|
||||
json_data = json.dumps(data, sort_keys=True, indent=4)
|
||||
with open(filename, 'w', encoding='utf-8') as fdesc:
|
||||
mode = 0o600 if private else 0o644
|
||||
with open(os.open(tmp_filename, O_WRONLY | O_CREAT | O_TRUNC, mode),
|
||||
'w', encoding='utf-8') as fdesc:
|
||||
fdesc.write(json_data)
|
||||
os.replace(tmp_filename, filename)
|
||||
except TypeError as error:
|
||||
_LOGGER.exception('Failed to serialize to JSON: %s',
|
||||
filename)
|
||||
|
@ -56,3 +62,11 @@ def save_json(filename: str, data: Union[List, Dict],
|
|||
_LOGGER.exception('Saving JSON file failed: %s',
|
||||
filename)
|
||||
raise WriteError(error)
|
||||
finally:
|
||||
if os.path.exists(tmp_filename):
|
||||
try:
|
||||
os.remove(tmp_filename)
|
||||
except OSError as err:
|
||||
# If we are cleaning up then something else went wrong, so
|
||||
# we should suppress likely follow-on errors in the cleanup
|
||||
_LOGGER.error("JSON replacement cleanup failed: %s", err)
|
||||
|
|
|
@ -1,14 +1,16 @@
|
|||
"""Test the Emulated Hue component."""
|
||||
import json
|
||||
|
||||
from unittest.mock import patch, Mock, mock_open
|
||||
from unittest.mock import patch, Mock, mock_open, MagicMock
|
||||
|
||||
from homeassistant.components.emulated_hue import Config
|
||||
|
||||
|
||||
def test_config_google_home_entity_id_to_number():
|
||||
"""Test config adheres to the type."""
|
||||
conf = Config(Mock(), {
|
||||
mock_hass = Mock()
|
||||
mock_hass.config.path = MagicMock("path", return_value="test_path")
|
||||
conf = Config(mock_hass, {
|
||||
'type': 'google_home'
|
||||
})
|
||||
|
||||
|
@ -16,29 +18,33 @@ def test_config_google_home_entity_id_to_number():
|
|||
handle = mop()
|
||||
|
||||
with patch('homeassistant.util.json.open', mop, create=True):
|
||||
number = conf.entity_id_to_number('light.test')
|
||||
assert number == '2'
|
||||
assert handle.write.call_count == 1
|
||||
assert json.loads(handle.write.mock_calls[0][1][0]) == {
|
||||
'1': 'light.test2',
|
||||
'2': 'light.test',
|
||||
}
|
||||
with patch('homeassistant.util.json.os.open', return_value=0):
|
||||
with patch('homeassistant.util.json.os.replace'):
|
||||
number = conf.entity_id_to_number('light.test')
|
||||
assert number == '2'
|
||||
assert handle.write.call_count == 1
|
||||
assert json.loads(handle.write.mock_calls[0][1][0]) == {
|
||||
'1': 'light.test2',
|
||||
'2': 'light.test',
|
||||
}
|
||||
|
||||
number = conf.entity_id_to_number('light.test')
|
||||
assert number == '2'
|
||||
assert handle.write.call_count == 1
|
||||
number = conf.entity_id_to_number('light.test')
|
||||
assert number == '2'
|
||||
assert handle.write.call_count == 1
|
||||
|
||||
number = conf.entity_id_to_number('light.test2')
|
||||
assert number == '1'
|
||||
assert handle.write.call_count == 1
|
||||
number = conf.entity_id_to_number('light.test2')
|
||||
assert number == '1'
|
||||
assert handle.write.call_count == 1
|
||||
|
||||
entity_id = conf.number_to_entity_id('1')
|
||||
assert entity_id == 'light.test2'
|
||||
entity_id = conf.number_to_entity_id('1')
|
||||
assert entity_id == 'light.test2'
|
||||
|
||||
|
||||
def test_config_google_home_entity_id_to_number_altered():
|
||||
"""Test config adheres to the type."""
|
||||
conf = Config(Mock(), {
|
||||
mock_hass = Mock()
|
||||
mock_hass.config.path = MagicMock("path", return_value="test_path")
|
||||
conf = Config(mock_hass, {
|
||||
'type': 'google_home'
|
||||
})
|
||||
|
||||
|
@ -46,29 +52,33 @@ def test_config_google_home_entity_id_to_number_altered():
|
|||
handle = mop()
|
||||
|
||||
with patch('homeassistant.util.json.open', mop, create=True):
|
||||
number = conf.entity_id_to_number('light.test')
|
||||
assert number == '22'
|
||||
assert handle.write.call_count == 1
|
||||
assert json.loads(handle.write.mock_calls[0][1][0]) == {
|
||||
'21': 'light.test2',
|
||||
'22': 'light.test',
|
||||
}
|
||||
with patch('homeassistant.util.json.os.open', return_value=0):
|
||||
with patch('homeassistant.util.json.os.replace'):
|
||||
number = conf.entity_id_to_number('light.test')
|
||||
assert number == '22'
|
||||
assert handle.write.call_count == 1
|
||||
assert json.loads(handle.write.mock_calls[0][1][0]) == {
|
||||
'21': 'light.test2',
|
||||
'22': 'light.test',
|
||||
}
|
||||
|
||||
number = conf.entity_id_to_number('light.test')
|
||||
assert number == '22'
|
||||
assert handle.write.call_count == 1
|
||||
number = conf.entity_id_to_number('light.test')
|
||||
assert number == '22'
|
||||
assert handle.write.call_count == 1
|
||||
|
||||
number = conf.entity_id_to_number('light.test2')
|
||||
assert number == '21'
|
||||
assert handle.write.call_count == 1
|
||||
number = conf.entity_id_to_number('light.test2')
|
||||
assert number == '21'
|
||||
assert handle.write.call_count == 1
|
||||
|
||||
entity_id = conf.number_to_entity_id('21')
|
||||
assert entity_id == 'light.test2'
|
||||
entity_id = conf.number_to_entity_id('21')
|
||||
assert entity_id == 'light.test2'
|
||||
|
||||
|
||||
def test_config_google_home_entity_id_to_number_empty():
|
||||
"""Test config adheres to the type."""
|
||||
conf = Config(Mock(), {
|
||||
mock_hass = Mock()
|
||||
mock_hass.config.path = MagicMock("path", return_value="test_path")
|
||||
conf = Config(mock_hass, {
|
||||
'type': 'google_home'
|
||||
})
|
||||
|
||||
|
@ -76,23 +86,25 @@ def test_config_google_home_entity_id_to_number_empty():
|
|||
handle = mop()
|
||||
|
||||
with patch('homeassistant.util.json.open', mop, create=True):
|
||||
number = conf.entity_id_to_number('light.test')
|
||||
assert number == '1'
|
||||
assert handle.write.call_count == 1
|
||||
assert json.loads(handle.write.mock_calls[0][1][0]) == {
|
||||
'1': 'light.test',
|
||||
}
|
||||
with patch('homeassistant.util.json.os.open', return_value=0):
|
||||
with patch('homeassistant.util.json.os.replace'):
|
||||
number = conf.entity_id_to_number('light.test')
|
||||
assert number == '1'
|
||||
assert handle.write.call_count == 1
|
||||
assert json.loads(handle.write.mock_calls[0][1][0]) == {
|
||||
'1': 'light.test',
|
||||
}
|
||||
|
||||
number = conf.entity_id_to_number('light.test')
|
||||
assert number == '1'
|
||||
assert handle.write.call_count == 1
|
||||
number = conf.entity_id_to_number('light.test')
|
||||
assert number == '1'
|
||||
assert handle.write.call_count == 1
|
||||
|
||||
number = conf.entity_id_to_number('light.test2')
|
||||
assert number == '2'
|
||||
assert handle.write.call_count == 2
|
||||
number = conf.entity_id_to_number('light.test2')
|
||||
assert number == '2'
|
||||
assert handle.write.call_count == 2
|
||||
|
||||
entity_id = conf.number_to_entity_id('2')
|
||||
assert entity_id == 'light.test2'
|
||||
entity_id = conf.number_to_entity_id('2')
|
||||
assert entity_id == 'light.test2'
|
||||
|
||||
|
||||
def test_config_alexa_entity_id_to_number():
|
||||
|
|
|
@ -15,6 +15,7 @@ from tests.common import async_fire_time_changed, mock_coro
|
|||
MOCK_VERSION = 1
|
||||
MOCK_KEY = 'storage-test'
|
||||
MOCK_DATA = {'hello': 'world'}
|
||||
MOCK_DATA2 = {'goodbye': 'cruel world'}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
"""Test Home Assistant json utility functions."""
|
||||
import os
|
||||
import unittest
|
||||
import sys
|
||||
from tempfile import mkdtemp
|
||||
|
||||
from homeassistant.util.json import (SerializationError,
|
||||
load_json, save_json)
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
|
||||
# Test data that can be saved as JSON
|
||||
TEST_JSON_A = {"a": 1, "B": "two"}
|
||||
TEST_JSON_B = {"a": "one", "B": 2}
|
||||
# Test data that can not be saved as JSON (keys must be strings)
|
||||
TEST_BAD_OBJECT = {("A",): 1}
|
||||
# Test data that can not be loaded as JSON
|
||||
TEST_BAD_SERIALIED = "THIS IS NOT JSON\n"
|
||||
|
||||
|
||||
class TestJSON(unittest.TestCase):
|
||||
"""Test util.json save and load."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up for tests."""
|
||||
self.tmp_dir = mkdtemp()
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up after tests."""
|
||||
for fname in os.listdir(self.tmp_dir):
|
||||
os.remove(os.path.join(self.tmp_dir, fname))
|
||||
os.rmdir(self.tmp_dir)
|
||||
|
||||
def _path_for(self, leaf_name):
|
||||
return os.path.join(self.tmp_dir, leaf_name+".json")
|
||||
|
||||
def test_save_and_load(self):
|
||||
"""Test saving and loading back."""
|
||||
fname = self._path_for("test1")
|
||||
save_json(fname, TEST_JSON_A)
|
||||
data = load_json(fname)
|
||||
self.assertEqual(data, TEST_JSON_A)
|
||||
|
||||
# Skipped on Windows
|
||||
@unittest.skipIf(sys.platform.startswith('win'),
|
||||
"private permissions not supported on Windows")
|
||||
def test_save_and_load_private(self):
|
||||
"""Test we can load private files and that they are protected."""
|
||||
fname = self._path_for("test2")
|
||||
save_json(fname, TEST_JSON_A, private=True)
|
||||
data = load_json(fname)
|
||||
self.assertEqual(data, TEST_JSON_A)
|
||||
stats = os.stat(fname)
|
||||
self.assertEqual(stats.st_mode & 0o77, 0)
|
||||
|
||||
def test_overwrite_and_reload(self):
|
||||
"""Test that we can overwrite an existing file and read back."""
|
||||
fname = self._path_for("test3")
|
||||
save_json(fname, TEST_JSON_A)
|
||||
save_json(fname, TEST_JSON_B)
|
||||
data = load_json(fname)
|
||||
self.assertEqual(data, TEST_JSON_B)
|
||||
|
||||
def test_save_bad_data(self):
|
||||
"""Test error from trying to save unserialisable data."""
|
||||
fname = self._path_for("test4")
|
||||
with self.assertRaises(SerializationError):
|
||||
save_json(fname, TEST_BAD_OBJECT)
|
||||
|
||||
def test_load_bad_data(self):
|
||||
"""Test error from trying to load unserialisable data."""
|
||||
fname = self._path_for("test5")
|
||||
with open(fname, "w") as fh:
|
||||
fh.write(TEST_BAD_SERIALIED)
|
||||
with self.assertRaises(HomeAssistantError):
|
||||
load_json(fname)
|
Loading…
Reference in New Issue