mirror of
https://github.com/esphome/esphome.git
synced 2025-02-16 01:48:14 +00:00
safe file writer
This commit is contained in:
parent
95c7236faa
commit
f060c4020c
43
esphome/dashboard/util/file.py
Normal file
43
esphome/dashboard/util/file.py
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# from https://github.com/home-assistant/core/blob/dev/homeassistant/util/file.py
|
||||||
|
def write_utf8_file(
|
||||||
|
filename: Path,
|
||||||
|
utf8_data: str,
|
||||||
|
private: bool = False,
|
||||||
|
) -> None:
|
||||||
|
"""Write a file and rename it into place.
|
||||||
|
|
||||||
|
Writes all or nothing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
tmp_filename = ""
|
||||||
|
try:
|
||||||
|
# Modern versions of Python tempfile create this file with mode 0o600
|
||||||
|
with tempfile.NamedTemporaryFile(
|
||||||
|
mode="w", encoding="utf-8", dir=os.path.dirname(filename), delete=False
|
||||||
|
) as fdesc:
|
||||||
|
fdesc.write(utf8_data)
|
||||||
|
tmp_filename = fdesc.name
|
||||||
|
if not private:
|
||||||
|
os.fchmod(fdesc.fileno(), 0o644)
|
||||||
|
os.replace(tmp_filename, filename)
|
||||||
|
finally:
|
||||||
|
if os.path.exists(tmp_filename):
|
||||||
|
try:
|
||||||
|
os.remove(tmp_filename)
|
||||||
|
except OSError as err:
|
||||||
|
# If we are cleaning up then something else went wrong, so
|
||||||
|
# we should suppress likely follow-on errors in the cleanup
|
||||||
|
_LOGGER.error(
|
||||||
|
"File replacement cleanup failed for %s while saving %s: %s",
|
||||||
|
tmp_filename,
|
||||||
|
filename,
|
||||||
|
err,
|
||||||
|
)
|
0
tests/dashboard/__init__.py
Normal file
0
tests/dashboard/__init__.py
Normal file
0
tests/dashboard/util/__init__.py
Normal file
0
tests/dashboard/util/__init__.py
Normal file
48
tests/dashboard/util/test_file.py
Normal file
48
tests/dashboard/util/test_file.py
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import py
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from esphome.dashboard.util.file import write_utf8_file
|
||||||
|
|
||||||
|
|
||||||
|
def test_write_utf8_file(tmp_path: Path) -> None:
|
||||||
|
write_utf8_file(tmp_path.joinpath("foo.txt"), "foo")
|
||||||
|
assert tmp_path.joinpath("foo.txt").read_text() == "foo"
|
||||||
|
|
||||||
|
with pytest.raises(OSError):
|
||||||
|
write_utf8_file(Path("/not-writable"), "bar")
|
||||||
|
|
||||||
|
|
||||||
|
def test_write_utf8_file_fails_at_rename(
|
||||||
|
tmpdir: py.path.local, caplog: pytest.LogCaptureFixture
|
||||||
|
) -> None:
|
||||||
|
"""Test that if rename fails not not remove, we do not log the failed cleanup."""
|
||||||
|
test_dir = tmpdir.mkdir("files")
|
||||||
|
test_file = Path(test_dir / "test.json")
|
||||||
|
|
||||||
|
with pytest.raises(OSError), patch(
|
||||||
|
"esphome.dashboard.util.file.os.replace", side_effect=OSError
|
||||||
|
):
|
||||||
|
write_utf8_file(test_file, '{"some":"data"}', False)
|
||||||
|
|
||||||
|
assert not os.path.exists(test_file)
|
||||||
|
|
||||||
|
assert "File replacement cleanup failed" not in caplog.text
|
||||||
|
|
||||||
|
|
||||||
|
def test_write_utf8_file_fails_at_rename_and_remove(
|
||||||
|
tmpdir: py.path.local, caplog: pytest.LogCaptureFixture
|
||||||
|
) -> None:
|
||||||
|
"""Test that if rename and remove both fail, we log the failed cleanup."""
|
||||||
|
test_dir = tmpdir.mkdir("files")
|
||||||
|
test_file = Path(test_dir / "test.json")
|
||||||
|
|
||||||
|
with pytest.raises(OSError), patch(
|
||||||
|
"esphome.dashboard.util.file.os.remove", side_effect=OSError
|
||||||
|
), patch("esphome.dashboard.util.file.os.replace", side_effect=OSError):
|
||||||
|
write_utf8_file(test_file, '{"some":"data"}', False)
|
||||||
|
|
||||||
|
assert "File replacement cleanup failed" in caplog.text
|
Loading…
x
Reference in New Issue
Block a user