1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-26 12:43:48 +00:00

[core] os.path -> Path (#10654)

Co-authored-by: pre-commit-ci-lite[bot] <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
Co-authored-by: J. Nick Koston <nick@home-assistant.io>
This commit is contained in:
Jesse Hills
2025-09-20 00:59:48 +12:00
committed by GitHub
parent de617c85c7
commit 9ea3643b74
57 changed files with 808 additions and 938 deletions

View File

@@ -1,6 +1,5 @@
from __future__ import annotations
import codecs
from contextlib import suppress
import ipaddress
import logging
@@ -8,6 +7,7 @@ import os
from pathlib import Path
import platform
import re
import shutil
import tempfile
from typing import TYPE_CHECKING
from urllib.parse import urlparse
@@ -140,16 +140,16 @@ def run_system_command(*args):
return rc, stdout, stderr
def mkdir_p(path):
def mkdir_p(path: Path):
if not path:
# Empty path - means create current dir
return
try:
os.makedirs(path)
path.mkdir(parents=True, exist_ok=True)
except OSError as err:
import errno
if err.errno == errno.EEXIST and os.path.isdir(path):
if err.errno == errno.EEXIST and path.is_dir():
pass
else:
from esphome.core import EsphomeError
@@ -331,16 +331,15 @@ def is_ha_addon():
return get_bool_env("ESPHOME_IS_HA_ADDON")
def walk_files(path):
def walk_files(path: Path):
for root, _, files in os.walk(path):
for name in files:
yield os.path.join(root, name)
yield Path(root) / name
def read_file(path):
def read_file(path: Path) -> str:
try:
with codecs.open(path, "r", encoding="utf-8") as f_handle:
return f_handle.read()
return path.read_text(encoding="utf-8")
except OSError as err:
from esphome.core import EsphomeError
@@ -351,13 +350,15 @@ def read_file(path):
raise EsphomeError(f"Error reading file {path}: {err}") from err
def _write_file(path: Path | str, text: str | bytes):
def _write_file(
path: Path,
text: str | bytes,
private: bool = False,
) -> None:
"""Atomically writes `text` to the given path.
Automatically creates all parent directories.
"""
if not isinstance(path, Path):
path = Path(path)
data = text
if isinstance(text, str):
data = text.encode()
@@ -365,42 +366,54 @@ def _write_file(path: Path | str, text: str | bytes):
directory = path.parent
directory.mkdir(exist_ok=True, parents=True)
tmp_path = None
tmp_filename: Path | None = None
missing_fchmod = False
try:
# Modern versions of Python tempfile create this file with mode 0o600
with tempfile.NamedTemporaryFile(
mode="wb", dir=directory, delete=False
) as f_handle:
tmp_path = f_handle.name
f_handle.write(data)
# Newer tempfile implementations create the file with mode 0o600
os.chmod(tmp_path, 0o644)
# If destination exists, will be overwritten
os.replace(tmp_path, path)
tmp_filename = Path(f_handle.name)
if not private:
try:
os.fchmod(f_handle.fileno(), 0o644)
except AttributeError:
# os.fchmod is not available on Windows
missing_fchmod = True
shutil.move(tmp_filename, path)
if missing_fchmod:
path.chmod(0o644)
finally:
if tmp_path is not None and os.path.exists(tmp_path):
if tmp_filename and tmp_filename.exists():
try:
os.remove(tmp_path)
tmp_filename.unlink()
except OSError as err:
_LOGGER.error("Write file cleanup failed: %s", err)
# If we are cleaning up then something else went wrong, so
# we should suppress likely follow-on errors in the cleanup
_LOGGER.error(
"File replacement cleanup failed for %s while saving %s: %s",
tmp_filename,
path,
err,
)
def write_file(path: Path | str, text: str):
def write_file(path: Path, text: str | bytes, private: bool = False) -> None:
try:
_write_file(path, text)
_write_file(path, text, private=private)
except OSError as err:
from esphome.core import EsphomeError
raise EsphomeError(f"Could not write file at {path}") from err
def write_file_if_changed(path: Path | str, text: str) -> bool:
def write_file_if_changed(path: Path, text: str) -> bool:
"""Write text to the given path, but not if the contents match already.
Returns true if the file was changed.
"""
if not isinstance(path, Path):
path = Path(path)
src_content = None
if path.is_file():
src_content = read_file(path)
@@ -410,12 +423,10 @@ def write_file_if_changed(path: Path | str, text: str) -> bool:
return True
def copy_file_if_changed(src: os.PathLike, dst: os.PathLike) -> None:
import shutil
def copy_file_if_changed(src: Path, dst: Path) -> None:
if file_compare(src, dst):
return
mkdir_p(os.path.dirname(dst))
dst.parent.mkdir(parents=True, exist_ok=True)
try:
shutil.copyfile(src, dst)
except OSError as err:
@@ -440,12 +451,12 @@ def list_starts_with(list_, sub):
return len(sub) <= len(list_) and all(list_[i] == x for i, x in enumerate(sub))
def file_compare(path1: os.PathLike, path2: os.PathLike) -> bool:
def file_compare(path1: Path, path2: Path) -> bool:
"""Return True if the files path1 and path2 have the same contents."""
import stat
try:
stat1, stat2 = os.stat(path1), os.stat(path2)
stat1, stat2 = path1.stat(), path2.stat()
except OSError:
# File doesn't exist or another error -> not equal
return False
@@ -462,7 +473,7 @@ def file_compare(path1: os.PathLike, path2: os.PathLike) -> bool:
bufsize = 8 * 1024
# Read files in blocks until a mismatch is found
with open(path1, "rb") as fh1, open(path2, "rb") as fh2:
with path1.open("rb") as fh1, path2.open("rb") as fh2:
while True:
blob1, blob2 = fh1.read(bufsize), fh2.read(bufsize)
if blob1 != blob2: