mirror of
https://github.com/esphome/esphome.git
synced 2025-09-06 21:32:21 +01:00
597 lines
18 KiB
Python
597 lines
18 KiB
Python
import logging
|
|
import socket
|
|
from unittest.mock import patch
|
|
|
|
from aioesphomeapi.host_resolver import AddrInfo, IPv4Sockaddr, IPv6Sockaddr
|
|
from hypothesis import given
|
|
from hypothesis.strategies import ip_addresses
|
|
import pytest
|
|
|
|
from esphome import helpers
|
|
from esphome.core import EsphomeError
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"preferred_string, current_strings, expected",
|
|
(
|
|
("foo", [], "foo"),
|
|
# TODO: Should this actually start at 1?
|
|
("foo", ["foo"], "foo_2"),
|
|
("foo", ("foo",), "foo_2"),
|
|
("foo", ("foo", "foo_2"), "foo_3"),
|
|
("foo", ("foo", "foo_2", "foo_2"), "foo_3"),
|
|
),
|
|
)
|
|
def test_ensure_unique_string(preferred_string, current_strings, expected):
|
|
actual = helpers.ensure_unique_string(preferred_string, current_strings)
|
|
|
|
assert actual == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"text, expected",
|
|
(
|
|
("foo", "foo"),
|
|
("foo\nbar", "foo\nbar"),
|
|
("foo\nbar\neek", "foo\n bar\neek"),
|
|
),
|
|
)
|
|
def test_indent_all_but_first_and_last(text, expected):
|
|
actual = helpers.indent_all_but_first_and_last(text)
|
|
|
|
assert actual == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"text, expected",
|
|
(
|
|
("foo", [" foo"]),
|
|
("foo\nbar", [" foo", " bar"]),
|
|
("foo\nbar\neek", [" foo", " bar", " eek"]),
|
|
),
|
|
)
|
|
def test_indent_list(text, expected):
|
|
actual = helpers.indent_list(text)
|
|
|
|
assert actual == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"text, expected",
|
|
(
|
|
("foo", " foo"),
|
|
("foo\nbar", " foo\n bar"),
|
|
("foo\nbar\neek", " foo\n bar\n eek"),
|
|
),
|
|
)
|
|
def test_indent(text, expected):
|
|
actual = helpers.indent(text)
|
|
|
|
assert actual == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"string, expected",
|
|
(
|
|
("foo", '"foo"'),
|
|
("foo\nbar", '"foo\\012bar"'),
|
|
("foo\\bar", '"foo\\134bar"'),
|
|
('foo "bar"', '"foo \\042bar\\042"'),
|
|
("foo 🐍", '"foo \\360\\237\\220\\215"'),
|
|
),
|
|
)
|
|
def test_cpp_string_escape(string, expected):
|
|
actual = helpers.cpp_string_escape(string)
|
|
|
|
assert actual == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"host",
|
|
(
|
|
"127.0.0",
|
|
"localhost",
|
|
"127.0.0.b",
|
|
),
|
|
)
|
|
def test_is_ip_address__invalid(host):
|
|
actual = helpers.is_ip_address(host)
|
|
|
|
assert actual is False
|
|
|
|
|
|
@given(value=ip_addresses(v=4).map(str))
|
|
def test_is_ip_address__valid(value):
|
|
actual = helpers.is_ip_address(value)
|
|
|
|
assert actual is True
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"var, value, default, expected",
|
|
(
|
|
("FOO", None, False, False),
|
|
("FOO", None, True, True),
|
|
("FOO", "", False, False),
|
|
("FOO", "False", False, False),
|
|
("FOO", "True", False, True),
|
|
("FOO", "FALSE", True, False),
|
|
("FOO", "fAlSe", True, False),
|
|
("FOO", "Yes", False, True),
|
|
("FOO", "123", False, True),
|
|
),
|
|
)
|
|
def test_get_bool_env(monkeypatch, var, value, default, expected):
|
|
if value is None:
|
|
monkeypatch.delenv(var, raising=False)
|
|
else:
|
|
monkeypatch.setenv(var, value)
|
|
|
|
actual = helpers.get_bool_env(var, default)
|
|
|
|
assert actual == expected
|
|
|
|
|
|
@pytest.mark.parametrize("value, expected", ((None, False), ("Yes", True)))
|
|
def test_is_ha_addon(monkeypatch, value, expected):
|
|
if value is None:
|
|
monkeypatch.delenv("ESPHOME_IS_HA_ADDON", raising=False)
|
|
else:
|
|
monkeypatch.setenv("ESPHOME_IS_HA_ADDON", value)
|
|
|
|
actual = helpers.is_ha_addon()
|
|
|
|
assert actual == expected
|
|
|
|
|
|
def test_walk_files(fixture_path):
|
|
path = fixture_path / "helpers"
|
|
|
|
actual = list(helpers.walk_files(path))
|
|
|
|
# Ensure paths start with the root
|
|
assert all(p.startswith(str(path)) for p in actual)
|
|
|
|
|
|
class Test_write_file_if_changed:
|
|
def test_src_and_dst_match(self, tmp_path):
|
|
text = "A files are unique.\n"
|
|
initial = text
|
|
dst = tmp_path / "file-a.txt"
|
|
dst.write_text(initial)
|
|
|
|
helpers.write_file_if_changed(dst, text)
|
|
|
|
assert dst.read_text() == text
|
|
|
|
def test_src_and_dst_do_not_match(self, tmp_path):
|
|
text = "A files are unique.\n"
|
|
initial = "B files are unique.\n"
|
|
dst = tmp_path / "file-a.txt"
|
|
dst.write_text(initial)
|
|
|
|
helpers.write_file_if_changed(dst, text)
|
|
|
|
assert dst.read_text() == text
|
|
|
|
def test_dst_does_not_exist(self, tmp_path):
|
|
text = "A files are unique.\n"
|
|
dst = tmp_path / "file-a.txt"
|
|
|
|
helpers.write_file_if_changed(dst, text)
|
|
|
|
assert dst.read_text() == text
|
|
|
|
|
|
class Test_copy_file_if_changed:
|
|
def test_src_and_dst_match(self, tmp_path, fixture_path):
|
|
src = fixture_path / "helpers" / "file-a.txt"
|
|
initial = fixture_path / "helpers" / "file-a.txt"
|
|
dst = tmp_path / "file-a.txt"
|
|
|
|
dst.write_text(initial.read_text())
|
|
|
|
helpers.copy_file_if_changed(src, dst)
|
|
|
|
def test_src_and_dst_do_not_match(self, tmp_path, fixture_path):
|
|
src = fixture_path / "helpers" / "file-a.txt"
|
|
initial = fixture_path / "helpers" / "file-c.txt"
|
|
dst = tmp_path / "file-a.txt"
|
|
|
|
dst.write_text(initial.read_text())
|
|
|
|
helpers.copy_file_if_changed(src, dst)
|
|
|
|
assert src.read_text() == dst.read_text()
|
|
|
|
def test_dst_does_not_exist(self, tmp_path, fixture_path):
|
|
src = fixture_path / "helpers" / "file-a.txt"
|
|
dst = tmp_path / "file-a.txt"
|
|
|
|
helpers.copy_file_if_changed(src, dst)
|
|
|
|
assert dst.exists()
|
|
assert src.read_text() == dst.read_text()
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"file1, file2, expected",
|
|
(
|
|
# Same file
|
|
("file-a.txt", "file-a.txt", True),
|
|
# Different files, different size
|
|
("file-a.txt", "file-b_1.txt", False),
|
|
# Different files, same size
|
|
("file-a.txt", "file-c.txt", False),
|
|
# Same files
|
|
("file-b_1.txt", "file-b_2.txt", True),
|
|
# Not a file
|
|
("file-a.txt", "", False),
|
|
# File doesn't exist
|
|
("file-a.txt", "file-d.txt", False),
|
|
),
|
|
)
|
|
def test_file_compare(fixture_path, file1, file2, expected):
|
|
path1 = fixture_path / "helpers" / file1
|
|
path2 = fixture_path / "helpers" / file2
|
|
|
|
actual = helpers.file_compare(path1, path2)
|
|
|
|
assert actual == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"text, expected",
|
|
(
|
|
("foo", "foo"),
|
|
("foo bar", "foo_bar"),
|
|
("foo Bar", "foo_bar"),
|
|
("foo BAR", "foo_bar"),
|
|
("foo.bar", "foo.bar"),
|
|
("fooBAR", "foobar"),
|
|
("Foo-bar_EEK", "foo-bar_eek"),
|
|
(" foo", "__foo"),
|
|
),
|
|
)
|
|
def test_snake_case(text, expected):
|
|
actual = helpers.snake_case(text)
|
|
|
|
assert actual == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"text, expected",
|
|
(
|
|
("foo_bar", "foo_bar"),
|
|
('!"§$%&/()=?foo_bar', "___________foo_bar"),
|
|
('foo_!"§$%&/()=?bar', "foo____________bar"),
|
|
('foo_bar!"§$%&/()=?', "foo_bar___________"),
|
|
('foo-bar!"§$%&/()=?', "foo-bar___________"),
|
|
),
|
|
)
|
|
def test_sanitize(text, expected):
|
|
actual = helpers.sanitize(text)
|
|
|
|
assert actual == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"text, expected",
|
|
((["127.0.0.1", "fe80::1", "2001::2"], ["2001::2", "127.0.0.1", "fe80::1"]),),
|
|
)
|
|
def test_sort_ip_addresses(text: list[str], expected: list[str]) -> None:
|
|
actual = helpers.sort_ip_addresses(text)
|
|
|
|
assert actual == expected
|
|
|
|
|
|
# DNS resolution tests
|
|
def test_is_ip_address_ipv4() -> None:
|
|
"""Test is_ip_address with IPv4 addresses."""
|
|
assert helpers.is_ip_address("192.168.1.1") is True
|
|
assert helpers.is_ip_address("127.0.0.1") is True
|
|
assert helpers.is_ip_address("255.255.255.255") is True
|
|
assert helpers.is_ip_address("0.0.0.0") is True
|
|
|
|
|
|
def test_is_ip_address_ipv6() -> None:
|
|
"""Test is_ip_address with IPv6 addresses."""
|
|
assert helpers.is_ip_address("::1") is True
|
|
assert helpers.is_ip_address("2001:db8::1") is True
|
|
assert helpers.is_ip_address("fe80::1") is True
|
|
assert helpers.is_ip_address("::") is True
|
|
|
|
|
|
def test_is_ip_address_invalid() -> None:
|
|
"""Test is_ip_address with non-IP strings."""
|
|
assert helpers.is_ip_address("hostname") is False
|
|
assert helpers.is_ip_address("hostname.local") is False
|
|
assert helpers.is_ip_address("256.256.256.256") is False
|
|
assert helpers.is_ip_address("192.168.1") is False
|
|
assert helpers.is_ip_address("") is False
|
|
|
|
|
|
def test_resolve_ip_address_single_ipv4() -> None:
|
|
"""Test resolving a single IPv4 address (fast path)."""
|
|
result = helpers.resolve_ip_address("192.168.1.100", 6053)
|
|
|
|
assert len(result) == 1
|
|
assert result[0][0] == socket.AF_INET # family
|
|
assert result[0][1] in (
|
|
0,
|
|
socket.SOCK_STREAM,
|
|
) # type (0 on Windows with AI_NUMERICHOST)
|
|
assert result[0][2] in (
|
|
0,
|
|
socket.IPPROTO_TCP,
|
|
) # proto (0 on Windows with AI_NUMERICHOST)
|
|
assert result[0][3] == "" # canonname
|
|
assert result[0][4] == ("192.168.1.100", 6053) # sockaddr
|
|
|
|
|
|
def test_resolve_ip_address_single_ipv6() -> None:
|
|
"""Test resolving a single IPv6 address (fast path)."""
|
|
result = helpers.resolve_ip_address("::1", 6053)
|
|
|
|
assert len(result) == 1
|
|
assert result[0][0] == socket.AF_INET6 # family
|
|
assert result[0][1] in (
|
|
0,
|
|
socket.SOCK_STREAM,
|
|
) # type (0 on Windows with AI_NUMERICHOST)
|
|
assert result[0][2] in (
|
|
0,
|
|
socket.IPPROTO_TCP,
|
|
) # proto (0 on Windows with AI_NUMERICHOST)
|
|
assert result[0][3] == "" # canonname
|
|
# IPv6 sockaddr has 4 elements
|
|
assert len(result[0][4]) == 4
|
|
assert result[0][4][0] == "::1" # address
|
|
assert result[0][4][1] == 6053 # port
|
|
|
|
|
|
def test_resolve_ip_address_list_of_ips() -> None:
|
|
"""Test resolving a list of IP addresses (fast path)."""
|
|
ips = ["192.168.1.100", "10.0.0.1", "::1"]
|
|
result = helpers.resolve_ip_address(ips, 6053)
|
|
|
|
# Should return results sorted by preference (IPv6 first, then IPv4)
|
|
assert len(result) >= 2 # At least IPv4 addresses should work
|
|
|
|
# Check that results are properly formatted
|
|
for addr_info in result:
|
|
assert addr_info[0] in (socket.AF_INET, socket.AF_INET6)
|
|
assert addr_info[1] in (
|
|
0,
|
|
socket.SOCK_STREAM,
|
|
) # 0 on Windows with AI_NUMERICHOST
|
|
assert addr_info[2] in (
|
|
0,
|
|
socket.IPPROTO_TCP,
|
|
) # 0 on Windows with AI_NUMERICHOST
|
|
assert addr_info[3] == ""
|
|
|
|
|
|
def test_resolve_ip_address_with_getaddrinfo_failure(caplog) -> None:
|
|
"""Test that getaddrinfo OSError is handled gracefully in fast path."""
|
|
with (
|
|
caplog.at_level(logging.DEBUG),
|
|
patch("socket.getaddrinfo") as mock_getaddrinfo,
|
|
):
|
|
# First IP succeeds
|
|
mock_getaddrinfo.side_effect = [
|
|
[
|
|
(
|
|
socket.AF_INET,
|
|
socket.SOCK_STREAM,
|
|
socket.IPPROTO_TCP,
|
|
"",
|
|
("192.168.1.100", 6053),
|
|
)
|
|
],
|
|
OSError("Failed to resolve"), # Second IP fails
|
|
]
|
|
|
|
# Should continue despite one failure
|
|
result = helpers.resolve_ip_address(["192.168.1.100", "192.168.1.101"], 6053)
|
|
|
|
# Should have result from first IP only
|
|
assert len(result) == 1
|
|
assert result[0][4][0] == "192.168.1.100"
|
|
|
|
# Verify both IPs were attempted
|
|
assert mock_getaddrinfo.call_count == 2
|
|
mock_getaddrinfo.assert_any_call(
|
|
"192.168.1.100", 6053, proto=socket.IPPROTO_TCP, flags=socket.AI_NUMERICHOST
|
|
)
|
|
mock_getaddrinfo.assert_any_call(
|
|
"192.168.1.101", 6053, proto=socket.IPPROTO_TCP, flags=socket.AI_NUMERICHOST
|
|
)
|
|
|
|
# Verify the debug log was called for the failed IP
|
|
assert "Failed to parse IP address '192.168.1.101'" in caplog.text
|
|
|
|
|
|
def test_resolve_ip_address_hostname() -> None:
|
|
"""Test resolving a hostname (async resolver path)."""
|
|
mock_addr_info = AddrInfo(
|
|
family=socket.AF_INET,
|
|
type=socket.SOCK_STREAM,
|
|
proto=socket.IPPROTO_TCP,
|
|
sockaddr=IPv4Sockaddr(address="192.168.1.100", port=6053),
|
|
)
|
|
|
|
with patch("esphome.resolver.AsyncResolver") as MockResolver:
|
|
mock_resolver = MockResolver.return_value
|
|
mock_resolver.resolve.return_value = [mock_addr_info]
|
|
|
|
result = helpers.resolve_ip_address("test.local", 6053)
|
|
|
|
assert len(result) == 1
|
|
assert result[0][0] == socket.AF_INET
|
|
assert result[0][4] == ("192.168.1.100", 6053)
|
|
MockResolver.assert_called_once_with(["test.local"], 6053)
|
|
mock_resolver.resolve.assert_called_once()
|
|
|
|
|
|
def test_resolve_ip_address_mixed_list() -> None:
|
|
"""Test resolving a mix of IPs and hostnames."""
|
|
mock_addr_info = AddrInfo(
|
|
family=socket.AF_INET,
|
|
type=socket.SOCK_STREAM,
|
|
proto=socket.IPPROTO_TCP,
|
|
sockaddr=IPv4Sockaddr(address="192.168.1.200", port=6053),
|
|
)
|
|
|
|
with patch("esphome.resolver.AsyncResolver") as MockResolver:
|
|
mock_resolver = MockResolver.return_value
|
|
mock_resolver.resolve.return_value = [mock_addr_info]
|
|
|
|
# Mix of IP and hostname - should use async resolver
|
|
result = helpers.resolve_ip_address(["192.168.1.100", "test.local"], 6053)
|
|
|
|
assert len(result) == 1
|
|
assert result[0][4][0] == "192.168.1.200"
|
|
MockResolver.assert_called_once_with(["192.168.1.100", "test.local"], 6053)
|
|
mock_resolver.resolve.assert_called_once()
|
|
|
|
|
|
def test_resolve_ip_address_url() -> None:
|
|
"""Test extracting hostname from URL."""
|
|
mock_addr_info = AddrInfo(
|
|
family=socket.AF_INET,
|
|
type=socket.SOCK_STREAM,
|
|
proto=socket.IPPROTO_TCP,
|
|
sockaddr=IPv4Sockaddr(address="192.168.1.100", port=6053),
|
|
)
|
|
|
|
with patch("esphome.resolver.AsyncResolver") as MockResolver:
|
|
mock_resolver = MockResolver.return_value
|
|
mock_resolver.resolve.return_value = [mock_addr_info]
|
|
|
|
result = helpers.resolve_ip_address("http://test.local", 6053)
|
|
|
|
assert len(result) == 1
|
|
MockResolver.assert_called_once_with(["test.local"], 6053)
|
|
mock_resolver.resolve.assert_called_once()
|
|
|
|
|
|
def test_resolve_ip_address_ipv6_conversion() -> None:
|
|
"""Test proper IPv6 address info conversion."""
|
|
mock_addr_info = AddrInfo(
|
|
family=socket.AF_INET6,
|
|
type=socket.SOCK_STREAM,
|
|
proto=socket.IPPROTO_TCP,
|
|
sockaddr=IPv6Sockaddr(address="2001:db8::1", port=6053, flowinfo=1, scope_id=2),
|
|
)
|
|
|
|
with patch("esphome.resolver.AsyncResolver") as MockResolver:
|
|
mock_resolver = MockResolver.return_value
|
|
mock_resolver.resolve.return_value = [mock_addr_info]
|
|
|
|
result = helpers.resolve_ip_address("test.local", 6053)
|
|
|
|
assert len(result) == 1
|
|
assert result[0][0] == socket.AF_INET6
|
|
assert result[0][4] == ("2001:db8::1", 6053, 1, 2)
|
|
|
|
|
|
def test_resolve_ip_address_error_handling() -> None:
|
|
"""Test error handling from AsyncResolver."""
|
|
with patch("esphome.resolver.AsyncResolver") as MockResolver:
|
|
mock_resolver = MockResolver.return_value
|
|
mock_resolver.resolve.side_effect = EsphomeError("Resolution failed")
|
|
|
|
with pytest.raises(EsphomeError, match="Resolution failed"):
|
|
helpers.resolve_ip_address("test.local", 6053)
|
|
|
|
|
|
def test_addr_preference_ipv4() -> None:
|
|
"""Test address preference for IPv4."""
|
|
addr_info = (
|
|
socket.AF_INET,
|
|
socket.SOCK_STREAM,
|
|
socket.IPPROTO_TCP,
|
|
"",
|
|
("192.168.1.1", 6053),
|
|
)
|
|
assert helpers.addr_preference_(addr_info) == 2
|
|
|
|
|
|
def test_addr_preference_ipv6() -> None:
|
|
"""Test address preference for regular IPv6."""
|
|
addr_info = (
|
|
socket.AF_INET6,
|
|
socket.SOCK_STREAM,
|
|
socket.IPPROTO_TCP,
|
|
"",
|
|
("2001:db8::1", 6053, 0, 0),
|
|
)
|
|
assert helpers.addr_preference_(addr_info) == 1
|
|
|
|
|
|
def test_addr_preference_ipv6_link_local_no_scope() -> None:
|
|
"""Test address preference for link-local IPv6 without scope."""
|
|
addr_info = (
|
|
socket.AF_INET6,
|
|
socket.SOCK_STREAM,
|
|
socket.IPPROTO_TCP,
|
|
"",
|
|
("fe80::1", 6053, 0, 0), # link-local with scope_id=0
|
|
)
|
|
assert helpers.addr_preference_(addr_info) == 3
|
|
|
|
|
|
def test_addr_preference_ipv6_link_local_with_scope() -> None:
|
|
"""Test address preference for link-local IPv6 with scope."""
|
|
addr_info = (
|
|
socket.AF_INET6,
|
|
socket.SOCK_STREAM,
|
|
socket.IPPROTO_TCP,
|
|
"",
|
|
("fe80::1", 6053, 0, 2), # link-local with scope_id=2
|
|
)
|
|
assert helpers.addr_preference_(addr_info) == 1 # Has scope, so it's usable
|
|
|
|
|
|
def test_resolve_ip_address_sorting() -> None:
|
|
"""Test that results are sorted by preference."""
|
|
# Create multiple address infos with different preferences
|
|
mock_addr_infos = [
|
|
AddrInfo(
|
|
family=socket.AF_INET6,
|
|
type=socket.SOCK_STREAM,
|
|
proto=socket.IPPROTO_TCP,
|
|
sockaddr=IPv6Sockaddr(
|
|
address="fe80::1", port=6053, flowinfo=0, scope_id=0
|
|
), # Preference 3 (link-local no scope)
|
|
),
|
|
AddrInfo(
|
|
family=socket.AF_INET,
|
|
type=socket.SOCK_STREAM,
|
|
proto=socket.IPPROTO_TCP,
|
|
sockaddr=IPv4Sockaddr(
|
|
address="192.168.1.100", port=6053
|
|
), # Preference 2 (IPv4)
|
|
),
|
|
AddrInfo(
|
|
family=socket.AF_INET6,
|
|
type=socket.SOCK_STREAM,
|
|
proto=socket.IPPROTO_TCP,
|
|
sockaddr=IPv6Sockaddr(
|
|
address="2001:db8::1", port=6053, flowinfo=0, scope_id=0
|
|
), # Preference 1 (IPv6)
|
|
),
|
|
]
|
|
|
|
with patch("esphome.resolver.AsyncResolver") as MockResolver:
|
|
mock_resolver = MockResolver.return_value
|
|
mock_resolver.resolve.return_value = mock_addr_infos
|
|
|
|
result = helpers.resolve_ip_address("test.local", 6053)
|
|
|
|
# Should be sorted: IPv6 first, then IPv4, then link-local without scope
|
|
assert result[0][4][0] == "2001:db8::1" # IPv6 (preference 1)
|
|
assert result[1][4][0] == "192.168.1.100" # IPv4 (preference 2)
|
|
assert result[2][4][0] == "fe80::1" # Link-local no scope (preference 3)
|