import socket from unittest.mock import patch from aioesphomeapi.host_resolver import AddrInfo, IPv4Sockaddr, IPv6Sockaddr from hypothesis import given from hypothesis.strategies import ip_addresses import pytest from esphome import helpers from esphome.core import EsphomeError @pytest.mark.parametrize( "preferred_string, current_strings, expected", ( ("foo", [], "foo"), # TODO: Should this actually start at 1? ("foo", ["foo"], "foo_2"), ("foo", ("foo",), "foo_2"), ("foo", ("foo", "foo_2"), "foo_3"), ("foo", ("foo", "foo_2", "foo_2"), "foo_3"), ), ) def test_ensure_unique_string(preferred_string, current_strings, expected): actual = helpers.ensure_unique_string(preferred_string, current_strings) assert actual == expected @pytest.mark.parametrize( "text, expected", ( ("foo", "foo"), ("foo\nbar", "foo\nbar"), ("foo\nbar\neek", "foo\n bar\neek"), ), ) def test_indent_all_but_first_and_last(text, expected): actual = helpers.indent_all_but_first_and_last(text) assert actual == expected @pytest.mark.parametrize( "text, expected", ( ("foo", [" foo"]), ("foo\nbar", [" foo", " bar"]), ("foo\nbar\neek", [" foo", " bar", " eek"]), ), ) def test_indent_list(text, expected): actual = helpers.indent_list(text) assert actual == expected @pytest.mark.parametrize( "text, expected", ( ("foo", " foo"), ("foo\nbar", " foo\n bar"), ("foo\nbar\neek", " foo\n bar\n eek"), ), ) def test_indent(text, expected): actual = helpers.indent(text) assert actual == expected @pytest.mark.parametrize( "string, expected", ( ("foo", '"foo"'), ("foo\nbar", '"foo\\012bar"'), ("foo\\bar", '"foo\\134bar"'), ('foo "bar"', '"foo \\042bar\\042"'), ("foo ๐Ÿ", '"foo \\360\\237\\220\\215"'), ), ) def test_cpp_string_escape(string, expected): actual = helpers.cpp_string_escape(string) assert actual == expected @pytest.mark.parametrize( "host", ( "127.0.0", "localhost", "127.0.0.b", ), ) def test_is_ip_address__invalid(host): actual = helpers.is_ip_address(host) assert actual is False @given(value=ip_addresses(v=4).map(str)) def test_is_ip_address__valid(value): actual = helpers.is_ip_address(value) assert actual is True @pytest.mark.parametrize( "var, value, default, expected", ( ("FOO", None, False, False), ("FOO", None, True, True), ("FOO", "", False, False), ("FOO", "False", False, False), ("FOO", "True", False, True), ("FOO", "FALSE", True, False), ("FOO", "fAlSe", True, False), ("FOO", "Yes", False, True), ("FOO", "123", False, True), ), ) def test_get_bool_env(monkeypatch, var, value, default, expected): if value is None: monkeypatch.delenv(var, raising=False) else: monkeypatch.setenv(var, value) actual = helpers.get_bool_env(var, default) assert actual == expected @pytest.mark.parametrize("value, expected", ((None, False), ("Yes", True))) def test_is_ha_addon(monkeypatch, value, expected): if value is None: monkeypatch.delenv("ESPHOME_IS_HA_ADDON", raising=False) else: monkeypatch.setenv("ESPHOME_IS_HA_ADDON", value) actual = helpers.is_ha_addon() assert actual == expected def test_walk_files(fixture_path): path = fixture_path / "helpers" actual = list(helpers.walk_files(path)) # Ensure paths start with the root assert all(p.startswith(str(path)) for p in actual) class Test_write_file_if_changed: def test_src_and_dst_match(self, tmp_path): text = "A files are unique.\n" initial = text dst = tmp_path / "file-a.txt" dst.write_text(initial) helpers.write_file_if_changed(dst, text) assert dst.read_text() == text def test_src_and_dst_do_not_match(self, tmp_path): text = "A files are unique.\n" initial = "B files are unique.\n" dst = tmp_path / "file-a.txt" dst.write_text(initial) helpers.write_file_if_changed(dst, text) assert dst.read_text() == text def test_dst_does_not_exist(self, tmp_path): text = "A files are unique.\n" dst = tmp_path / "file-a.txt" helpers.write_file_if_changed(dst, text) assert dst.read_text() == text class Test_copy_file_if_changed: def test_src_and_dst_match(self, tmp_path, fixture_path): src = fixture_path / "helpers" / "file-a.txt" initial = fixture_path / "helpers" / "file-a.txt" dst = tmp_path / "file-a.txt" dst.write_text(initial.read_text()) helpers.copy_file_if_changed(src, dst) def test_src_and_dst_do_not_match(self, tmp_path, fixture_path): src = fixture_path / "helpers" / "file-a.txt" initial = fixture_path / "helpers" / "file-c.txt" dst = tmp_path / "file-a.txt" dst.write_text(initial.read_text()) helpers.copy_file_if_changed(src, dst) assert src.read_text() == dst.read_text() def test_dst_does_not_exist(self, tmp_path, fixture_path): src = fixture_path / "helpers" / "file-a.txt" dst = tmp_path / "file-a.txt" helpers.copy_file_if_changed(src, dst) assert dst.exists() assert src.read_text() == dst.read_text() @pytest.mark.parametrize( "file1, file2, expected", ( # Same file ("file-a.txt", "file-a.txt", True), # Different files, different size ("file-a.txt", "file-b_1.txt", False), # Different files, same size ("file-a.txt", "file-c.txt", False), # Same files ("file-b_1.txt", "file-b_2.txt", True), # Not a file ("file-a.txt", "", False), # File doesn't exist ("file-a.txt", "file-d.txt", False), ), ) def test_file_compare(fixture_path, file1, file2, expected): path1 = fixture_path / "helpers" / file1 path2 = fixture_path / "helpers" / file2 actual = helpers.file_compare(path1, path2) assert actual == expected @pytest.mark.parametrize( "text, expected", ( ("foo", "foo"), ("foo bar", "foo_bar"), ("foo Bar", "foo_bar"), ("foo BAR", "foo_bar"), ("foo.bar", "foo.bar"), ("fooBAR", "foobar"), ("Foo-bar_EEK", "foo-bar_eek"), (" foo", "__foo"), ), ) def test_snake_case(text, expected): actual = helpers.snake_case(text) assert actual == expected @pytest.mark.parametrize( "text, expected", ( ("foo_bar", "foo_bar"), ('!"ยง$%&/()=?foo_bar', "___________foo_bar"), ('foo_!"ยง$%&/()=?bar', "foo____________bar"), ('foo_bar!"ยง$%&/()=?', "foo_bar___________"), ('foo-bar!"ยง$%&/()=?', "foo-bar___________"), ), ) def test_sanitize(text, expected): actual = helpers.sanitize(text) assert actual == expected @pytest.mark.parametrize( "text, expected", ((["127.0.0.1", "fe80::1", "2001::2"], ["2001::2", "127.0.0.1", "fe80::1"]),), ) def test_sort_ip_addresses(text: list[str], expected: list[str]) -> None: actual = helpers.sort_ip_addresses(text) assert actual == expected # DNS resolution tests def test_is_ip_address_ipv4() -> None: """Test is_ip_address with IPv4 addresses.""" assert helpers.is_ip_address("192.168.1.1") is True assert helpers.is_ip_address("127.0.0.1") is True assert helpers.is_ip_address("255.255.255.255") is True assert helpers.is_ip_address("0.0.0.0") is True def test_is_ip_address_ipv6() -> None: """Test is_ip_address with IPv6 addresses.""" assert helpers.is_ip_address("::1") is True assert helpers.is_ip_address("2001:db8::1") is True assert helpers.is_ip_address("fe80::1") is True assert helpers.is_ip_address("::") is True def test_is_ip_address_invalid() -> None: """Test is_ip_address with non-IP strings.""" assert helpers.is_ip_address("hostname") is False assert helpers.is_ip_address("hostname.local") is False assert helpers.is_ip_address("256.256.256.256") is False assert helpers.is_ip_address("192.168.1") is False assert helpers.is_ip_address("") is False def test_resolve_ip_address_single_ipv4() -> None: """Test resolving a single IPv4 address (fast path).""" result = helpers.resolve_ip_address("192.168.1.100", 6053) assert len(result) == 1 assert result[0][0] == socket.AF_INET # family assert result[0][1] in ( 0, socket.SOCK_STREAM, ) # type (0 on Windows with AI_NUMERICHOST) assert result[0][2] in ( 0, socket.IPPROTO_TCP, ) # proto (0 on Windows with AI_NUMERICHOST) assert result[0][3] == "" # canonname assert result[0][4] == ("192.168.1.100", 6053) # sockaddr def test_resolve_ip_address_single_ipv6() -> None: """Test resolving a single IPv6 address (fast path).""" result = helpers.resolve_ip_address("::1", 6053) assert len(result) == 1 assert result[0][0] == socket.AF_INET6 # family assert result[0][1] in ( 0, socket.SOCK_STREAM, ) # type (0 on Windows with AI_NUMERICHOST) assert result[0][2] in ( 0, socket.IPPROTO_TCP, ) # proto (0 on Windows with AI_NUMERICHOST) assert result[0][3] == "" # canonname # IPv6 sockaddr has 4 elements assert len(result[0][4]) == 4 assert result[0][4][0] == "::1" # address assert result[0][4][1] == 6053 # port def test_resolve_ip_address_list_of_ips() -> None: """Test resolving a list of IP addresses (fast path).""" ips = ["192.168.1.100", "10.0.0.1", "::1"] result = helpers.resolve_ip_address(ips, 6053) # Should return results sorted by preference (IPv6 first, then IPv4) assert len(result) >= 2 # At least IPv4 addresses should work # Check that results are properly formatted for addr_info in result: assert addr_info[0] in (socket.AF_INET, socket.AF_INET6) assert addr_info[1] in ( 0, socket.SOCK_STREAM, ) # 0 on Windows with AI_NUMERICHOST assert addr_info[2] in ( 0, socket.IPPROTO_TCP, ) # 0 on Windows with AI_NUMERICHOST assert addr_info[3] == "" def test_resolve_ip_address_hostname() -> None: """Test resolving a hostname (async resolver path).""" mock_addr_info = AddrInfo( family=socket.AF_INET, type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP, sockaddr=IPv4Sockaddr(address="192.168.1.100", port=6053), ) with patch("esphome.resolver.AsyncResolver") as MockResolver: mock_resolver = MockResolver.return_value mock_resolver.run.return_value = [mock_addr_info] result = helpers.resolve_ip_address("test.local", 6053) assert len(result) == 1 assert result[0][0] == socket.AF_INET assert result[0][4] == ("192.168.1.100", 6053) mock_resolver.run.assert_called_once_with(["test.local"], 6053) def test_resolve_ip_address_mixed_list() -> None: """Test resolving a mix of IPs and hostnames.""" mock_addr_info = AddrInfo( family=socket.AF_INET, type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP, sockaddr=IPv4Sockaddr(address="192.168.1.200", port=6053), ) with patch("esphome.resolver.AsyncResolver") as MockResolver: mock_resolver = MockResolver.return_value mock_resolver.run.return_value = [mock_addr_info] # Mix of IP and hostname - should use async resolver result = helpers.resolve_ip_address(["192.168.1.100", "test.local"], 6053) assert len(result) == 1 assert result[0][4][0] == "192.168.1.200" mock_resolver.run.assert_called_once_with(["192.168.1.100", "test.local"], 6053) def test_resolve_ip_address_url() -> None: """Test extracting hostname from URL.""" mock_addr_info = AddrInfo( family=socket.AF_INET, type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP, sockaddr=IPv4Sockaddr(address="192.168.1.100", port=6053), ) with patch("esphome.resolver.AsyncResolver") as MockResolver: mock_resolver = MockResolver.return_value mock_resolver.run.return_value = [mock_addr_info] result = helpers.resolve_ip_address("http://test.local", 6053) assert len(result) == 1 mock_resolver.run.assert_called_once_with(["test.local"], 6053) def test_resolve_ip_address_ipv6_conversion() -> None: """Test proper IPv6 address info conversion.""" mock_addr_info = AddrInfo( family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP, sockaddr=IPv6Sockaddr(address="2001:db8::1", port=6053, flowinfo=1, scope_id=2), ) with patch("esphome.resolver.AsyncResolver") as MockResolver: mock_resolver = MockResolver.return_value mock_resolver.run.return_value = [mock_addr_info] result = helpers.resolve_ip_address("test.local", 6053) assert len(result) == 1 assert result[0][0] == socket.AF_INET6 assert result[0][4] == ("2001:db8::1", 6053, 1, 2) def test_resolve_ip_address_error_handling() -> None: """Test error handling from AsyncResolver.""" with patch("esphome.resolver.AsyncResolver") as MockResolver: mock_resolver = MockResolver.return_value mock_resolver.run.side_effect = EsphomeError("Resolution failed") with pytest.raises(EsphomeError, match="Resolution failed"): helpers.resolve_ip_address("test.local", 6053) def test_addr_preference_ipv4() -> None: """Test address preference for IPv4.""" addr_info = ( socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", ("192.168.1.1", 6053), ) assert helpers.addr_preference_(addr_info) == 2 def test_addr_preference_ipv6() -> None: """Test address preference for regular IPv6.""" addr_info = ( socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", ("2001:db8::1", 6053, 0, 0), ) assert helpers.addr_preference_(addr_info) == 1 def test_addr_preference_ipv6_link_local_no_scope() -> None: """Test address preference for link-local IPv6 without scope.""" addr_info = ( socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", ("fe80::1", 6053, 0, 0), # link-local with scope_id=0 ) assert helpers.addr_preference_(addr_info) == 3 def test_addr_preference_ipv6_link_local_with_scope() -> None: """Test address preference for link-local IPv6 with scope.""" addr_info = ( socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", ("fe80::1", 6053, 0, 2), # link-local with scope_id=2 ) assert helpers.addr_preference_(addr_info) == 1 # Has scope, so it's usable def test_resolve_ip_address_sorting() -> None: """Test that results are sorted by preference.""" # Create multiple address infos with different preferences mock_addr_infos = [ AddrInfo( family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP, sockaddr=IPv6Sockaddr( address="fe80::1", port=6053, flowinfo=0, scope_id=0 ), # Preference 3 (link-local no scope) ), AddrInfo( family=socket.AF_INET, type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP, sockaddr=IPv4Sockaddr( address="192.168.1.100", port=6053 ), # Preference 2 (IPv4) ), AddrInfo( family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP, sockaddr=IPv6Sockaddr( address="2001:db8::1", port=6053, flowinfo=0, scope_id=0 ), # Preference 1 (IPv6) ), ] with patch("esphome.resolver.AsyncResolver") as MockResolver: mock_resolver = MockResolver.return_value mock_resolver.run.return_value = mock_addr_infos result = helpers.resolve_ip_address("test.local", 6053) # Should be sorted: IPv6 first, then IPv4, then link-local without scope assert result[0][4][0] == "2001:db8::1" # IPv6 (preference 1) assert result[1][4][0] == "192.168.1.100" # IPv4 (preference 2) assert result[2][4][0] == "fe80::1" # Link-local no scope (preference 3)