mirror of
https://github.com/esphome/esphome.git
synced 2025-09-06 21:32:21 +01:00
Merge branch 'align_resolver' into integration
This commit is contained in:
2
.github/actions/restore-python/action.yml
vendored
2
.github/actions/restore-python/action.yml
vendored
@@ -17,7 +17,7 @@ runs:
|
||||
steps:
|
||||
- name: Set up Python ${{ inputs.python-version }}
|
||||
id: python
|
||||
uses: actions/setup-python@v5.6.0
|
||||
uses: actions/setup-python@v6.0.0
|
||||
with:
|
||||
python-version: ${{ inputs.python-version }}
|
||||
- name: Restore Python virtual environment
|
||||
|
2
.github/workflows/auto-label-pr.yml
vendored
2
.github/workflows/auto-label-pr.yml
vendored
@@ -32,7 +32,7 @@ jobs:
|
||||
private-key: ${{ secrets.ESPHOME_GITHUB_APP_PRIVATE_KEY }}
|
||||
|
||||
- name: Auto Label PR
|
||||
uses: actions/github-script@v7.0.1
|
||||
uses: actions/github-script@v8.0.0
|
||||
with:
|
||||
github-token: ${{ steps.generate-token.outputs.token }}
|
||||
script: |
|
||||
|
6
.github/workflows/ci-api-proto.yml
vendored
6
.github/workflows/ci-api-proto.yml
vendored
@@ -23,7 +23,7 @@ jobs:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v5.0.0
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5.6.0
|
||||
uses: actions/setup-python@v6.0.0
|
||||
with:
|
||||
python-version: "3.11"
|
||||
|
||||
@@ -47,7 +47,7 @@ jobs:
|
||||
fi
|
||||
- if: failure()
|
||||
name: Review PR
|
||||
uses: actions/github-script@v7.0.1
|
||||
uses: actions/github-script@v8.0.0
|
||||
with:
|
||||
script: |
|
||||
await github.rest.pulls.createReview({
|
||||
@@ -70,7 +70,7 @@ jobs:
|
||||
esphome/components/api/api_pb2_service.*
|
||||
- if: success()
|
||||
name: Dismiss review
|
||||
uses: actions/github-script@v7.0.1
|
||||
uses: actions/github-script@v8.0.0
|
||||
with:
|
||||
script: |
|
||||
let reviews = await github.rest.pulls.listReviews({
|
||||
|
6
.github/workflows/ci-clang-tidy-hash.yml
vendored
6
.github/workflows/ci-clang-tidy-hash.yml
vendored
@@ -23,7 +23,7 @@ jobs:
|
||||
uses: actions/checkout@v5.0.0
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5.6.0
|
||||
uses: actions/setup-python@v6.0.0
|
||||
with:
|
||||
python-version: "3.11"
|
||||
|
||||
@@ -41,7 +41,7 @@ jobs:
|
||||
|
||||
- if: failure()
|
||||
name: Request changes
|
||||
uses: actions/github-script@v7.0.1
|
||||
uses: actions/github-script@v8.0.0
|
||||
with:
|
||||
script: |
|
||||
await github.rest.pulls.createReview({
|
||||
@@ -54,7 +54,7 @@ jobs:
|
||||
|
||||
- if: success()
|
||||
name: Dismiss review
|
||||
uses: actions/github-script@v7.0.1
|
||||
uses: actions/github-script@v8.0.0
|
||||
with:
|
||||
script: |
|
||||
let reviews = await github.rest.pulls.listReviews({
|
||||
|
2
.github/workflows/ci-docker.yml
vendored
2
.github/workflows/ci-docker.yml
vendored
@@ -45,7 +45,7 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v5.0.0
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5.6.0
|
||||
uses: actions/setup-python@v6.0.0
|
||||
with:
|
||||
python-version: "3.11"
|
||||
- name: Set up Docker Buildx
|
||||
|
6
.github/workflows/ci.yml
vendored
6
.github/workflows/ci.yml
vendored
@@ -42,7 +42,7 @@ jobs:
|
||||
run: echo key="${{ hashFiles('requirements.txt', 'requirements_test.txt', '.pre-commit-config.yaml') }}" >> $GITHUB_OUTPUT
|
||||
- name: Set up Python ${{ env.DEFAULT_PYTHON }}
|
||||
id: python
|
||||
uses: actions/setup-python@v5.6.0
|
||||
uses: actions/setup-python@v6.0.0
|
||||
with:
|
||||
python-version: ${{ env.DEFAULT_PYTHON }}
|
||||
- name: Restore Python virtual environment
|
||||
@@ -156,7 +156,7 @@ jobs:
|
||||
. venv/bin/activate
|
||||
pytest -vv --cov-report=xml --tb=native -n auto tests --ignore=tests/integration/
|
||||
- name: Upload coverage to Codecov
|
||||
uses: codecov/codecov-action@v5.5.0
|
||||
uses: codecov/codecov-action@v5.5.1
|
||||
with:
|
||||
token: ${{ secrets.CODECOV_TOKEN }}
|
||||
- name: Save Python virtual environment cache
|
||||
@@ -217,7 +217,7 @@ jobs:
|
||||
uses: actions/checkout@v5.0.0
|
||||
- name: Set up Python 3.13
|
||||
id: python
|
||||
uses: actions/setup-python@v5.6.0
|
||||
uses: actions/setup-python@v6.0.0
|
||||
with:
|
||||
python-version: "3.13"
|
||||
- name: Restore Python virtual environment
|
||||
|
@@ -25,7 +25,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Request reviews from component codeowners
|
||||
uses: actions/github-script@v7.0.1
|
||||
uses: actions/github-script@v8.0.0
|
||||
with:
|
||||
script: |
|
||||
const owner = context.repo.owner;
|
||||
|
2
.github/workflows/external-component-bot.yml
vendored
2
.github/workflows/external-component-bot.yml
vendored
@@ -15,7 +15,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Add external component comment
|
||||
uses: actions/github-script@v7.0.1
|
||||
uses: actions/github-script@v8.0.0
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
script: |
|
||||
|
2
.github/workflows/issue-codeowner-notify.yml
vendored
2
.github/workflows/issue-codeowner-notify.yml
vendored
@@ -19,7 +19,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Notify codeowners for component issues
|
||||
uses: actions/github-script@v7.0.1
|
||||
uses: actions/github-script@v8.0.0
|
||||
with:
|
||||
script: |
|
||||
const owner = context.repo.owner;
|
||||
|
8
.github/workflows/release.yml
vendored
8
.github/workflows/release.yml
vendored
@@ -62,7 +62,7 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v5.0.0
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5.6.0
|
||||
uses: actions/setup-python@v6.0.0
|
||||
with:
|
||||
python-version: "3.x"
|
||||
- name: Build
|
||||
@@ -94,7 +94,7 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v5.0.0
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5.6.0
|
||||
uses: actions/setup-python@v6.0.0
|
||||
with:
|
||||
python-version: "3.11"
|
||||
|
||||
@@ -220,7 +220,7 @@ jobs:
|
||||
- deploy-manifest
|
||||
steps:
|
||||
- name: Trigger Workflow
|
||||
uses: actions/github-script@v7.0.1
|
||||
uses: actions/github-script@v8.0.0
|
||||
with:
|
||||
github-token: ${{ secrets.DEPLOY_HA_ADDON_REPO_TOKEN }}
|
||||
script: |
|
||||
@@ -246,7 +246,7 @@ jobs:
|
||||
environment: ${{ needs.init.outputs.deploy_env }}
|
||||
steps:
|
||||
- name: Trigger Workflow
|
||||
uses: actions/github-script@v7.0.1
|
||||
uses: actions/github-script@v8.0.0
|
||||
with:
|
||||
github-token: ${{ secrets.DEPLOY_ESPHOME_SCHEMA_REPO_TOKEN }}
|
||||
script: |
|
||||
|
4
.github/workflows/stale.yml
vendored
4
.github/workflows/stale.yml
vendored
@@ -17,7 +17,7 @@ jobs:
|
||||
stale:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/stale@v9.1.0
|
||||
- uses: actions/stale@v10.0.0
|
||||
with:
|
||||
days-before-pr-stale: 90
|
||||
days-before-pr-close: 7
|
||||
@@ -37,7 +37,7 @@ jobs:
|
||||
close-issues:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/stale@v9.1.0
|
||||
- uses: actions/stale@v10.0.0
|
||||
with:
|
||||
days-before-pr-stale: -1
|
||||
days-before-pr-close: -1
|
||||
|
2
.github/workflows/status-check-labels.yml
vendored
2
.github/workflows/status-check-labels.yml
vendored
@@ -16,7 +16,7 @@ jobs:
|
||||
- merge-after-release
|
||||
steps:
|
||||
- name: Check for ${{ matrix.label }} label
|
||||
uses: actions/github-script@v7.0.1
|
||||
uses: actions/github-script@v8.0.0
|
||||
with:
|
||||
script: |
|
||||
const { data: labels } = await github.rest.issues.listLabelsOnIssue({
|
||||
|
2
.github/workflows/sync-device-classes.yml
vendored
2
.github/workflows/sync-device-classes.yml
vendored
@@ -22,7 +22,7 @@ jobs:
|
||||
path: lib/home-assistant
|
||||
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v5.6.0
|
||||
uses: actions/setup-python@v6.0.0
|
||||
with:
|
||||
python-version: 3.13
|
||||
|
||||
|
@@ -11,7 +11,7 @@ ci:
|
||||
repos:
|
||||
- repo: https://github.com/astral-sh/ruff-pre-commit
|
||||
# Ruff version.
|
||||
rev: v0.12.11
|
||||
rev: v0.12.12
|
||||
hooks:
|
||||
# Run the linter.
|
||||
- id: ruff
|
||||
|
@@ -396,7 +396,10 @@ def check_permissions(port: str):
|
||||
)
|
||||
|
||||
|
||||
def upload_program(config: ConfigType, args: ArgsProtocol, host: str) -> int | str:
|
||||
def upload_program(
|
||||
config: ConfigType, args: ArgsProtocol, devices: list[str]
|
||||
) -> int | str:
|
||||
host = devices[0]
|
||||
try:
|
||||
module = importlib.import_module("esphome.components." + CORE.target_platform)
|
||||
if getattr(module, "upload_program")(config, args, host):
|
||||
@@ -433,10 +436,10 @@ def upload_program(config: ConfigType, args: ArgsProtocol, host: str) -> int | s
|
||||
|
||||
remote_port = int(ota_conf[CONF_PORT])
|
||||
password = ota_conf.get(CONF_PASSWORD, "")
|
||||
binary = args.file if getattr(args, "file", None) is not None else CORE.firmware_bin
|
||||
|
||||
# Check if we should use MQTT for address resolution
|
||||
# This happens when no device was specified, or the current host is "MQTT"/"OTA"
|
||||
devices: list[str] = args.device or []
|
||||
if (
|
||||
CONF_MQTT in config # pylint: disable=too-many-boolean-expressions
|
||||
and (not devices or host in ("MQTT", "OTA"))
|
||||
@@ -447,14 +450,13 @@ def upload_program(config: ConfigType, args: ArgsProtocol, host: str) -> int | s
|
||||
):
|
||||
from esphome import mqtt
|
||||
|
||||
host = mqtt.get_esphome_device_ip(
|
||||
config, args.username, args.password, args.client_id
|
||||
)
|
||||
devices = [
|
||||
mqtt.get_esphome_device_ip(
|
||||
config, args.username, args.password, args.client_id
|
||||
)
|
||||
]
|
||||
|
||||
if getattr(args, "file", None) is not None:
|
||||
return espota2.run_ota(host, remote_port, password, args.file)
|
||||
|
||||
return espota2.run_ota(host, remote_port, password, CORE.firmware_bin)
|
||||
return espota2.run_ota(devices, remote_port, password, binary)
|
||||
|
||||
|
||||
def show_logs(config: ConfigType, args: ArgsProtocol, devices: list[str]) -> int | None:
|
||||
@@ -558,17 +560,11 @@ def command_upload(args: ArgsProtocol, config: ConfigType) -> int | None:
|
||||
purpose="uploading",
|
||||
)
|
||||
|
||||
# Try each device until one succeeds
|
||||
exit_code = 1
|
||||
for device in devices:
|
||||
_LOGGER.info("Uploading to %s", device)
|
||||
exit_code = upload_program(config, args, device)
|
||||
if exit_code == 0:
|
||||
_LOGGER.info("Successfully uploaded program.")
|
||||
return 0
|
||||
if len(devices) > 1:
|
||||
_LOGGER.warning("Failed to upload to %s", device)
|
||||
|
||||
exit_code = upload_program(config, args, devices)
|
||||
if exit_code == 0:
|
||||
_LOGGER.info("Successfully uploaded program.")
|
||||
else:
|
||||
_LOGGER.warning("Failed to upload to %s", devices)
|
||||
return exit_code
|
||||
|
||||
|
||||
|
@@ -308,8 +308,12 @@ def perform_ota(
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def run_ota_impl_(remote_host, remote_port, password, filename):
|
||||
def run_ota_impl_(
|
||||
remote_host: str | list[str], remote_port: int, password: str, filename: str
|
||||
) -> int:
|
||||
# Handle both single host and list of hosts
|
||||
try:
|
||||
# Resolve all hosts at once for parallel DNS resolution
|
||||
res = resolve_ip_address(remote_host, remote_port)
|
||||
except EsphomeError as err:
|
||||
_LOGGER.error(
|
||||
@@ -350,7 +354,9 @@ def run_ota_impl_(remote_host, remote_port, password, filename):
|
||||
return 1
|
||||
|
||||
|
||||
def run_ota(remote_host, remote_port, password, filename):
|
||||
def run_ota(
|
||||
remote_host: str | list[str], remote_port: int, password: str, filename: str
|
||||
) -> int:
|
||||
try:
|
||||
return run_ota_impl_(remote_host, remote_port, password, filename)
|
||||
except OTAError as err:
|
||||
|
@@ -1,3 +1,5 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import codecs
|
||||
from contextlib import suppress
|
||||
import ipaddress
|
||||
@@ -11,6 +13,18 @@ from urllib.parse import urlparse
|
||||
|
||||
from esphome.const import __version__ as ESPHOME_VERSION
|
||||
|
||||
# Type aliases for socket address information
|
||||
AddrInfo = tuple[
|
||||
int, # family (AF_INET, AF_INET6, etc.)
|
||||
int, # type (SOCK_STREAM, SOCK_DGRAM, etc.)
|
||||
int, # proto (IPPROTO_TCP, etc.)
|
||||
str, # canonname
|
||||
tuple[str, int] | tuple[str, int, int, int], # sockaddr (IPv4 or IPv6)
|
||||
]
|
||||
IPv4SockAddr = tuple[str, int] # (host, port)
|
||||
IPv6SockAddr = tuple[str, int, int, int] # (host, port, flowinfo, scope_id)
|
||||
SockAddr = IPv4SockAddr | IPv6SockAddr
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
IS_MACOS = platform.system() == "Darwin"
|
||||
@@ -147,32 +161,7 @@ def is_ip_address(host):
|
||||
return False
|
||||
|
||||
|
||||
def _resolve_with_zeroconf(host):
|
||||
from esphome.core import EsphomeError
|
||||
from esphome.zeroconf import EsphomeZeroconf
|
||||
|
||||
try:
|
||||
zc = EsphomeZeroconf()
|
||||
except Exception as err:
|
||||
raise EsphomeError(
|
||||
"Cannot start mDNS sockets, is this a docker container without "
|
||||
"host network mode?"
|
||||
) from err
|
||||
try:
|
||||
info = zc.resolve_host(f"{host}.")
|
||||
except Exception as err:
|
||||
raise EsphomeError(f"Error resolving mDNS hostname: {err}") from err
|
||||
finally:
|
||||
zc.close()
|
||||
if info is None:
|
||||
raise EsphomeError(
|
||||
"Error resolving address with mDNS: Did not respond. "
|
||||
"Maybe the device is offline."
|
||||
)
|
||||
return info
|
||||
|
||||
|
||||
def addr_preference_(res):
|
||||
def addr_preference_(res: AddrInfo) -> int:
|
||||
# Trivial alternative to RFC6724 sorting. Put sane IPv6 first, then
|
||||
# Legacy IP, then IPv6 link-local addresses without an actual link.
|
||||
sa = res[4]
|
||||
@@ -184,66 +173,70 @@ def addr_preference_(res):
|
||||
return 1
|
||||
|
||||
|
||||
def resolve_ip_address(host, port):
|
||||
def resolve_ip_address(host: str | list[str], port: int) -> list[AddrInfo]:
|
||||
import socket
|
||||
|
||||
from esphome.core import EsphomeError
|
||||
|
||||
# There are five cases here. The host argument could be one of:
|
||||
# • a *list* of IP addresses discovered by MQTT,
|
||||
# • a single IP address specified by the user,
|
||||
# • a .local hostname to be resolved by mDNS,
|
||||
# • a normal hostname to be resolved in DNS, or
|
||||
# • A URL from which we should extract the hostname.
|
||||
#
|
||||
# In each of the first three cases, we end up with IP addresses in
|
||||
# string form which need to be converted to a 5-tuple to be used
|
||||
# for the socket connection attempt. The easiest way to construct
|
||||
# those is to pass the IP address string to getaddrinfo(). Which,
|
||||
# coincidentally, is how we do hostname lookups in the other cases
|
||||
# too. So first build a list which contains either IP addresses or
|
||||
# a single hostname, then call getaddrinfo() on each element of
|
||||
# that list.
|
||||
|
||||
errs = []
|
||||
hosts: list[str]
|
||||
if isinstance(host, list):
|
||||
addr_list = host
|
||||
elif is_ip_address(host):
|
||||
addr_list = [host]
|
||||
hosts = host
|
||||
else:
|
||||
url = urlparse(host)
|
||||
if url.scheme != "":
|
||||
host = url.hostname
|
||||
if not is_ip_address(host):
|
||||
url = urlparse(host)
|
||||
if url.scheme != "":
|
||||
host = url.hostname
|
||||
hosts = [host]
|
||||
|
||||
addr_list = []
|
||||
if host.endswith(".local"):
|
||||
res: list[AddrInfo] = []
|
||||
if all(is_ip_address(h) for h in hosts):
|
||||
# Fast path: all are IP addresses, use socket.getaddrinfo with AI_NUMERICHOST
|
||||
for addr in hosts:
|
||||
try:
|
||||
_LOGGER.info("Resolving IP address of %s in mDNS", host)
|
||||
addr_list = _resolve_with_zeroconf(host)
|
||||
except EsphomeError as err:
|
||||
errs.append(str(err))
|
||||
res += socket.getaddrinfo(
|
||||
addr, port, proto=socket.IPPROTO_TCP, flags=socket.AI_NUMERICHOST
|
||||
)
|
||||
except OSError:
|
||||
_LOGGER.debug("Failed to parse IP address '%s'", addr)
|
||||
# Sort by preference
|
||||
res.sort(key=addr_preference_)
|
||||
return res
|
||||
|
||||
# If not mDNS, or if mDNS failed, use normal DNS
|
||||
if not addr_list:
|
||||
addr_list = [host]
|
||||
from esphome.resolver import AsyncResolver
|
||||
|
||||
# Now we have a list containing either IP addresses or a hostname
|
||||
res = []
|
||||
for addr in addr_list:
|
||||
if not is_ip_address(addr):
|
||||
_LOGGER.info("Resolving IP address of %s", host)
|
||||
try:
|
||||
r = socket.getaddrinfo(addr, port, proto=socket.IPPROTO_TCP)
|
||||
except OSError as err:
|
||||
errs.append(str(err))
|
||||
raise EsphomeError(
|
||||
f"Error resolving IP address: {', '.join(errs)}"
|
||||
) from err
|
||||
resolver = AsyncResolver(hosts, port)
|
||||
addr_infos = resolver.resolve()
|
||||
# Convert aioesphomeapi AddrInfo to our format
|
||||
for addr_info in addr_infos:
|
||||
sockaddr = addr_info.sockaddr
|
||||
if addr_info.family == socket.AF_INET6:
|
||||
# IPv6
|
||||
sockaddr_tuple = (
|
||||
sockaddr.address,
|
||||
sockaddr.port,
|
||||
sockaddr.flowinfo,
|
||||
sockaddr.scope_id,
|
||||
)
|
||||
else:
|
||||
# IPv4
|
||||
sockaddr_tuple = (sockaddr.address, sockaddr.port)
|
||||
|
||||
res = res + r
|
||||
res.append(
|
||||
(
|
||||
addr_info.family,
|
||||
addr_info.type,
|
||||
addr_info.proto,
|
||||
"", # canonname
|
||||
sockaddr_tuple,
|
||||
)
|
||||
)
|
||||
|
||||
# Zeroconf tends to give us link-local IPv6 addresses without specifying
|
||||
# the link. Put those last in the list to be attempted.
|
||||
# Sort by preference
|
||||
res.sort(key=addr_preference_)
|
||||
return res
|
||||
|
||||
@@ -262,15 +255,7 @@ def sort_ip_addresses(address_list: list[str]) -> list[str]:
|
||||
|
||||
# First "resolve" all the IP addresses to getaddrinfo() tuples of the form
|
||||
# (family, type, proto, canonname, sockaddr)
|
||||
res: list[
|
||||
tuple[
|
||||
int,
|
||||
int,
|
||||
int,
|
||||
str | None,
|
||||
tuple[str, int] | tuple[str, int, int, int],
|
||||
]
|
||||
] = []
|
||||
res: list[AddrInfo] = []
|
||||
for addr in address_list:
|
||||
# This should always work as these are supposed to be IP addresses
|
||||
try:
|
||||
|
67
esphome/resolver.py
Normal file
67
esphome/resolver.py
Normal file
@@ -0,0 +1,67 @@
|
||||
"""DNS resolver for ESPHome using aioesphomeapi."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import threading
|
||||
|
||||
from aioesphomeapi.core import ResolveAPIError, ResolveTimeoutAPIError
|
||||
import aioesphomeapi.host_resolver as hr
|
||||
|
||||
from esphome.core import EsphomeError
|
||||
|
||||
RESOLVE_TIMEOUT = 10.0 # seconds
|
||||
|
||||
|
||||
class AsyncResolver(threading.Thread):
|
||||
"""Resolver using aioesphomeapi that runs in a thread for faster results.
|
||||
|
||||
This resolver uses aioesphomeapi's async_resolve_host to handle DNS resolution,
|
||||
including proper .local domain fallback. Running in a thread allows us to get
|
||||
the result immediately without waiting for asyncio.run() to complete its
|
||||
cleanup cycle, which can take significant time.
|
||||
"""
|
||||
|
||||
def __init__(self, hosts: list[str], port: int) -> None:
|
||||
"""Initialize the resolver."""
|
||||
super().__init__(daemon=True)
|
||||
self.hosts = hosts
|
||||
self.port = port
|
||||
self.result: list[hr.AddrInfo] | None = None
|
||||
self.exception: Exception | None = None
|
||||
self.event = threading.Event()
|
||||
|
||||
async def _resolve(self) -> None:
|
||||
"""Resolve hostnames to IP addresses."""
|
||||
try:
|
||||
self.result = await hr.async_resolve_host(
|
||||
self.hosts, self.port, timeout=RESOLVE_TIMEOUT
|
||||
)
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
# We need to catch all exceptions to ensure the event is set
|
||||
# Otherwise the thread could hang forever
|
||||
self.exception = e
|
||||
finally:
|
||||
self.event.set()
|
||||
|
||||
def run(self) -> None:
|
||||
"""Run the DNS resolution."""
|
||||
asyncio.run(self._resolve())
|
||||
|
||||
def resolve(self) -> list[hr.AddrInfo]:
|
||||
"""Start the thread and wait for the result."""
|
||||
self.start()
|
||||
|
||||
if not self.event.wait(
|
||||
timeout=RESOLVE_TIMEOUT + 1.0
|
||||
): # Give it 1 second more than the resolver timeout
|
||||
raise EsphomeError("Timeout resolving IP address")
|
||||
|
||||
if exc := self.exception:
|
||||
if isinstance(exc, ResolveTimeoutAPIError):
|
||||
raise EsphomeError(f"Timeout resolving IP address: {exc}") from exc
|
||||
if isinstance(exc, ResolveAPIError):
|
||||
raise EsphomeError(f"Error resolving IP address: {exc}") from exc
|
||||
raise exc
|
||||
|
||||
return self.result
|
@@ -11,8 +11,8 @@ pyserial==3.5
|
||||
platformio==6.1.18 # When updating platformio, also update /docker/Dockerfile
|
||||
esptool==5.0.2
|
||||
click==8.1.7
|
||||
esphome-dashboard==20250828.0
|
||||
aioesphomeapi==39.0.1
|
||||
esphome-dashboard==20250904.0
|
||||
aioesphomeapi==40.0.0
|
||||
zeroconf==0.147.0
|
||||
puremagic==1.30
|
||||
ruamel.yaml==0.18.15 # dashboard_import
|
||||
|
@@ -1,13 +1,13 @@
|
||||
pylint==3.3.8
|
||||
flake8==7.3.0 # also change in .pre-commit-config.yaml when updating
|
||||
ruff==0.12.11 # also change in .pre-commit-config.yaml when updating
|
||||
ruff==0.12.12 # also change in .pre-commit-config.yaml when updating
|
||||
pyupgrade==3.20.0 # also change in .pre-commit-config.yaml when updating
|
||||
pre-commit
|
||||
|
||||
# Unit tests
|
||||
pytest==8.4.1
|
||||
pytest==8.4.2
|
||||
pytest-cov==6.2.1
|
||||
pytest-mock==3.14.1
|
||||
pytest-mock==3.15.0
|
||||
pytest-asyncio==1.1.0
|
||||
pytest-xdist==3.8.0
|
||||
asyncmock==0.4.2
|
||||
|
@@ -1,8 +1,14 @@
|
||||
import logging
|
||||
import socket
|
||||
from unittest.mock import patch
|
||||
|
||||
from aioesphomeapi.host_resolver import AddrInfo, IPv4Sockaddr, IPv6Sockaddr
|
||||
from hypothesis import given
|
||||
from hypothesis.strategies import ip_addresses
|
||||
import pytest
|
||||
|
||||
from esphome import helpers
|
||||
from esphome.core import EsphomeError
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -277,3 +283,314 @@ def test_sort_ip_addresses(text: list[str], expected: list[str]) -> None:
|
||||
actual = helpers.sort_ip_addresses(text)
|
||||
|
||||
assert actual == expected
|
||||
|
||||
|
||||
# DNS resolution tests
|
||||
def test_is_ip_address_ipv4() -> None:
|
||||
"""Test is_ip_address with IPv4 addresses."""
|
||||
assert helpers.is_ip_address("192.168.1.1") is True
|
||||
assert helpers.is_ip_address("127.0.0.1") is True
|
||||
assert helpers.is_ip_address("255.255.255.255") is True
|
||||
assert helpers.is_ip_address("0.0.0.0") is True
|
||||
|
||||
|
||||
def test_is_ip_address_ipv6() -> None:
|
||||
"""Test is_ip_address with IPv6 addresses."""
|
||||
assert helpers.is_ip_address("::1") is True
|
||||
assert helpers.is_ip_address("2001:db8::1") is True
|
||||
assert helpers.is_ip_address("fe80::1") is True
|
||||
assert helpers.is_ip_address("::") is True
|
||||
|
||||
|
||||
def test_is_ip_address_invalid() -> None:
|
||||
"""Test is_ip_address with non-IP strings."""
|
||||
assert helpers.is_ip_address("hostname") is False
|
||||
assert helpers.is_ip_address("hostname.local") is False
|
||||
assert helpers.is_ip_address("256.256.256.256") is False
|
||||
assert helpers.is_ip_address("192.168.1") is False
|
||||
assert helpers.is_ip_address("") is False
|
||||
|
||||
|
||||
def test_resolve_ip_address_single_ipv4() -> None:
|
||||
"""Test resolving a single IPv4 address (fast path)."""
|
||||
result = helpers.resolve_ip_address("192.168.1.100", 6053)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0][0] == socket.AF_INET # family
|
||||
assert result[0][1] in (
|
||||
0,
|
||||
socket.SOCK_STREAM,
|
||||
) # type (0 on Windows with AI_NUMERICHOST)
|
||||
assert result[0][2] in (
|
||||
0,
|
||||
socket.IPPROTO_TCP,
|
||||
) # proto (0 on Windows with AI_NUMERICHOST)
|
||||
assert result[0][3] == "" # canonname
|
||||
assert result[0][4] == ("192.168.1.100", 6053) # sockaddr
|
||||
|
||||
|
||||
def test_resolve_ip_address_single_ipv6() -> None:
|
||||
"""Test resolving a single IPv6 address (fast path)."""
|
||||
result = helpers.resolve_ip_address("::1", 6053)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0][0] == socket.AF_INET6 # family
|
||||
assert result[0][1] in (
|
||||
0,
|
||||
socket.SOCK_STREAM,
|
||||
) # type (0 on Windows with AI_NUMERICHOST)
|
||||
assert result[0][2] in (
|
||||
0,
|
||||
socket.IPPROTO_TCP,
|
||||
) # proto (0 on Windows with AI_NUMERICHOST)
|
||||
assert result[0][3] == "" # canonname
|
||||
# IPv6 sockaddr has 4 elements
|
||||
assert len(result[0][4]) == 4
|
||||
assert result[0][4][0] == "::1" # address
|
||||
assert result[0][4][1] == 6053 # port
|
||||
|
||||
|
||||
def test_resolve_ip_address_list_of_ips() -> None:
|
||||
"""Test resolving a list of IP addresses (fast path)."""
|
||||
ips = ["192.168.1.100", "10.0.0.1", "::1"]
|
||||
result = helpers.resolve_ip_address(ips, 6053)
|
||||
|
||||
# Should return results sorted by preference (IPv6 first, then IPv4)
|
||||
assert len(result) >= 2 # At least IPv4 addresses should work
|
||||
|
||||
# Check that results are properly formatted
|
||||
for addr_info in result:
|
||||
assert addr_info[0] in (socket.AF_INET, socket.AF_INET6)
|
||||
assert addr_info[1] in (
|
||||
0,
|
||||
socket.SOCK_STREAM,
|
||||
) # 0 on Windows with AI_NUMERICHOST
|
||||
assert addr_info[2] in (
|
||||
0,
|
||||
socket.IPPROTO_TCP,
|
||||
) # 0 on Windows with AI_NUMERICHOST
|
||||
assert addr_info[3] == ""
|
||||
|
||||
|
||||
def test_resolve_ip_address_with_getaddrinfo_failure(caplog) -> None:
|
||||
"""Test that getaddrinfo OSError is handled gracefully in fast path."""
|
||||
with (
|
||||
caplog.at_level(logging.DEBUG),
|
||||
patch("socket.getaddrinfo") as mock_getaddrinfo,
|
||||
):
|
||||
# First IP succeeds
|
||||
mock_getaddrinfo.side_effect = [
|
||||
[
|
||||
(
|
||||
socket.AF_INET,
|
||||
socket.SOCK_STREAM,
|
||||
socket.IPPROTO_TCP,
|
||||
"",
|
||||
("192.168.1.100", 6053),
|
||||
)
|
||||
],
|
||||
OSError("Failed to resolve"), # Second IP fails
|
||||
]
|
||||
|
||||
# Should continue despite one failure
|
||||
result = helpers.resolve_ip_address(["192.168.1.100", "192.168.1.101"], 6053)
|
||||
|
||||
# Should have result from first IP only
|
||||
assert len(result) == 1
|
||||
assert result[0][4][0] == "192.168.1.100"
|
||||
|
||||
# Verify both IPs were attempted
|
||||
assert mock_getaddrinfo.call_count == 2
|
||||
mock_getaddrinfo.assert_any_call(
|
||||
"192.168.1.100", 6053, proto=socket.IPPROTO_TCP, flags=socket.AI_NUMERICHOST
|
||||
)
|
||||
mock_getaddrinfo.assert_any_call(
|
||||
"192.168.1.101", 6053, proto=socket.IPPROTO_TCP, flags=socket.AI_NUMERICHOST
|
||||
)
|
||||
|
||||
# Verify the debug log was called for the failed IP
|
||||
assert "Failed to parse IP address '192.168.1.101'" in caplog.text
|
||||
|
||||
|
||||
def test_resolve_ip_address_hostname() -> None:
|
||||
"""Test resolving a hostname (async resolver path)."""
|
||||
mock_addr_info = AddrInfo(
|
||||
family=socket.AF_INET,
|
||||
type=socket.SOCK_STREAM,
|
||||
proto=socket.IPPROTO_TCP,
|
||||
sockaddr=IPv4Sockaddr(address="192.168.1.100", port=6053),
|
||||
)
|
||||
|
||||
with patch("esphome.resolver.AsyncResolver") as MockResolver:
|
||||
mock_resolver = MockResolver.return_value
|
||||
mock_resolver.resolve.return_value = [mock_addr_info]
|
||||
|
||||
result = helpers.resolve_ip_address("test.local", 6053)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0][0] == socket.AF_INET
|
||||
assert result[0][4] == ("192.168.1.100", 6053)
|
||||
MockResolver.assert_called_once_with(["test.local"], 6053)
|
||||
mock_resolver.resolve.assert_called_once()
|
||||
|
||||
|
||||
def test_resolve_ip_address_mixed_list() -> None:
|
||||
"""Test resolving a mix of IPs and hostnames."""
|
||||
mock_addr_info = AddrInfo(
|
||||
family=socket.AF_INET,
|
||||
type=socket.SOCK_STREAM,
|
||||
proto=socket.IPPROTO_TCP,
|
||||
sockaddr=IPv4Sockaddr(address="192.168.1.200", port=6053),
|
||||
)
|
||||
|
||||
with patch("esphome.resolver.AsyncResolver") as MockResolver:
|
||||
mock_resolver = MockResolver.return_value
|
||||
mock_resolver.resolve.return_value = [mock_addr_info]
|
||||
|
||||
# Mix of IP and hostname - should use async resolver
|
||||
result = helpers.resolve_ip_address(["192.168.1.100", "test.local"], 6053)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0][4][0] == "192.168.1.200"
|
||||
MockResolver.assert_called_once_with(["192.168.1.100", "test.local"], 6053)
|
||||
mock_resolver.resolve.assert_called_once()
|
||||
|
||||
|
||||
def test_resolve_ip_address_url() -> None:
|
||||
"""Test extracting hostname from URL."""
|
||||
mock_addr_info = AddrInfo(
|
||||
family=socket.AF_INET,
|
||||
type=socket.SOCK_STREAM,
|
||||
proto=socket.IPPROTO_TCP,
|
||||
sockaddr=IPv4Sockaddr(address="192.168.1.100", port=6053),
|
||||
)
|
||||
|
||||
with patch("esphome.resolver.AsyncResolver") as MockResolver:
|
||||
mock_resolver = MockResolver.return_value
|
||||
mock_resolver.resolve.return_value = [mock_addr_info]
|
||||
|
||||
result = helpers.resolve_ip_address("http://test.local", 6053)
|
||||
|
||||
assert len(result) == 1
|
||||
MockResolver.assert_called_once_with(["test.local"], 6053)
|
||||
mock_resolver.resolve.assert_called_once()
|
||||
|
||||
|
||||
def test_resolve_ip_address_ipv6_conversion() -> None:
|
||||
"""Test proper IPv6 address info conversion."""
|
||||
mock_addr_info = AddrInfo(
|
||||
family=socket.AF_INET6,
|
||||
type=socket.SOCK_STREAM,
|
||||
proto=socket.IPPROTO_TCP,
|
||||
sockaddr=IPv6Sockaddr(address="2001:db8::1", port=6053, flowinfo=1, scope_id=2),
|
||||
)
|
||||
|
||||
with patch("esphome.resolver.AsyncResolver") as MockResolver:
|
||||
mock_resolver = MockResolver.return_value
|
||||
mock_resolver.resolve.return_value = [mock_addr_info]
|
||||
|
||||
result = helpers.resolve_ip_address("test.local", 6053)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0][0] == socket.AF_INET6
|
||||
assert result[0][4] == ("2001:db8::1", 6053, 1, 2)
|
||||
|
||||
|
||||
def test_resolve_ip_address_error_handling() -> None:
|
||||
"""Test error handling from AsyncResolver."""
|
||||
with patch("esphome.resolver.AsyncResolver") as MockResolver:
|
||||
mock_resolver = MockResolver.return_value
|
||||
mock_resolver.resolve.side_effect = EsphomeError("Resolution failed")
|
||||
|
||||
with pytest.raises(EsphomeError, match="Resolution failed"):
|
||||
helpers.resolve_ip_address("test.local", 6053)
|
||||
|
||||
|
||||
def test_addr_preference_ipv4() -> None:
|
||||
"""Test address preference for IPv4."""
|
||||
addr_info = (
|
||||
socket.AF_INET,
|
||||
socket.SOCK_STREAM,
|
||||
socket.IPPROTO_TCP,
|
||||
"",
|
||||
("192.168.1.1", 6053),
|
||||
)
|
||||
assert helpers.addr_preference_(addr_info) == 2
|
||||
|
||||
|
||||
def test_addr_preference_ipv6() -> None:
|
||||
"""Test address preference for regular IPv6."""
|
||||
addr_info = (
|
||||
socket.AF_INET6,
|
||||
socket.SOCK_STREAM,
|
||||
socket.IPPROTO_TCP,
|
||||
"",
|
||||
("2001:db8::1", 6053, 0, 0),
|
||||
)
|
||||
assert helpers.addr_preference_(addr_info) == 1
|
||||
|
||||
|
||||
def test_addr_preference_ipv6_link_local_no_scope() -> None:
|
||||
"""Test address preference for link-local IPv6 without scope."""
|
||||
addr_info = (
|
||||
socket.AF_INET6,
|
||||
socket.SOCK_STREAM,
|
||||
socket.IPPROTO_TCP,
|
||||
"",
|
||||
("fe80::1", 6053, 0, 0), # link-local with scope_id=0
|
||||
)
|
||||
assert helpers.addr_preference_(addr_info) == 3
|
||||
|
||||
|
||||
def test_addr_preference_ipv6_link_local_with_scope() -> None:
|
||||
"""Test address preference for link-local IPv6 with scope."""
|
||||
addr_info = (
|
||||
socket.AF_INET6,
|
||||
socket.SOCK_STREAM,
|
||||
socket.IPPROTO_TCP,
|
||||
"",
|
||||
("fe80::1", 6053, 0, 2), # link-local with scope_id=2
|
||||
)
|
||||
assert helpers.addr_preference_(addr_info) == 1 # Has scope, so it's usable
|
||||
|
||||
|
||||
def test_resolve_ip_address_sorting() -> None:
|
||||
"""Test that results are sorted by preference."""
|
||||
# Create multiple address infos with different preferences
|
||||
mock_addr_infos = [
|
||||
AddrInfo(
|
||||
family=socket.AF_INET6,
|
||||
type=socket.SOCK_STREAM,
|
||||
proto=socket.IPPROTO_TCP,
|
||||
sockaddr=IPv6Sockaddr(
|
||||
address="fe80::1", port=6053, flowinfo=0, scope_id=0
|
||||
), # Preference 3 (link-local no scope)
|
||||
),
|
||||
AddrInfo(
|
||||
family=socket.AF_INET,
|
||||
type=socket.SOCK_STREAM,
|
||||
proto=socket.IPPROTO_TCP,
|
||||
sockaddr=IPv4Sockaddr(
|
||||
address="192.168.1.100", port=6053
|
||||
), # Preference 2 (IPv4)
|
||||
),
|
||||
AddrInfo(
|
||||
family=socket.AF_INET6,
|
||||
type=socket.SOCK_STREAM,
|
||||
proto=socket.IPPROTO_TCP,
|
||||
sockaddr=IPv6Sockaddr(
|
||||
address="2001:db8::1", port=6053, flowinfo=0, scope_id=0
|
||||
), # Preference 1 (IPv6)
|
||||
),
|
||||
]
|
||||
|
||||
with patch("esphome.resolver.AsyncResolver") as MockResolver:
|
||||
mock_resolver = MockResolver.return_value
|
||||
mock_resolver.resolve.return_value = mock_addr_infos
|
||||
|
||||
result = helpers.resolve_ip_address("test.local", 6053)
|
||||
|
||||
# Should be sorted: IPv6 first, then IPv4, then link-local without scope
|
||||
assert result[0][4][0] == "2001:db8::1" # IPv6 (preference 1)
|
||||
assert result[1][4][0] == "192.168.1.100" # IPv4 (preference 2)
|
||||
assert result[2][4][0] == "fe80::1" # Link-local no scope (preference 3)
|
||||
|
169
tests/unit_tests/test_resolver.py
Normal file
169
tests/unit_tests/test_resolver.py
Normal file
@@ -0,0 +1,169 @@
|
||||
"""Tests for the DNS resolver module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import socket
|
||||
from unittest.mock import patch
|
||||
|
||||
from aioesphomeapi.core import ResolveAPIError, ResolveTimeoutAPIError
|
||||
from aioesphomeapi.host_resolver import AddrInfo, IPv4Sockaddr, IPv6Sockaddr
|
||||
import pytest
|
||||
|
||||
from esphome.core import EsphomeError
|
||||
from esphome.resolver import RESOLVE_TIMEOUT, AsyncResolver
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_addr_info_ipv4() -> AddrInfo:
|
||||
"""Create a mock IPv4 AddrInfo."""
|
||||
return AddrInfo(
|
||||
family=socket.AF_INET,
|
||||
type=socket.SOCK_STREAM,
|
||||
proto=socket.IPPROTO_TCP,
|
||||
sockaddr=IPv4Sockaddr(address="192.168.1.100", port=6053),
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_addr_info_ipv6() -> AddrInfo:
|
||||
"""Create a mock IPv6 AddrInfo."""
|
||||
return AddrInfo(
|
||||
family=socket.AF_INET6,
|
||||
type=socket.SOCK_STREAM,
|
||||
proto=socket.IPPROTO_TCP,
|
||||
sockaddr=IPv6Sockaddr(address="2001:db8::1", port=6053, flowinfo=0, scope_id=0),
|
||||
)
|
||||
|
||||
|
||||
def test_async_resolver_successful_resolution(mock_addr_info_ipv4: AddrInfo) -> None:
|
||||
"""Test successful DNS resolution."""
|
||||
with patch(
|
||||
"esphome.resolver.hr.async_resolve_host",
|
||||
return_value=[mock_addr_info_ipv4],
|
||||
) as mock_resolve:
|
||||
resolver = AsyncResolver(["test.local"], 6053)
|
||||
result = resolver.resolve()
|
||||
|
||||
assert result == [mock_addr_info_ipv4]
|
||||
mock_resolve.assert_called_once_with(
|
||||
["test.local"], 6053, timeout=RESOLVE_TIMEOUT
|
||||
)
|
||||
|
||||
|
||||
def test_async_resolver_multiple_hosts(
|
||||
mock_addr_info_ipv4: AddrInfo, mock_addr_info_ipv6: AddrInfo
|
||||
) -> None:
|
||||
"""Test resolving multiple hosts."""
|
||||
mock_results = [mock_addr_info_ipv4, mock_addr_info_ipv6]
|
||||
|
||||
with patch(
|
||||
"esphome.resolver.hr.async_resolve_host",
|
||||
return_value=mock_results,
|
||||
) as mock_resolve:
|
||||
resolver = AsyncResolver(["test1.local", "test2.local"], 6053)
|
||||
result = resolver.resolve()
|
||||
|
||||
assert result == mock_results
|
||||
mock_resolve.assert_called_once_with(
|
||||
["test1.local", "test2.local"], 6053, timeout=RESOLVE_TIMEOUT
|
||||
)
|
||||
|
||||
|
||||
def test_async_resolver_resolve_api_error() -> None:
|
||||
"""Test handling of ResolveAPIError."""
|
||||
error_msg = "Failed to resolve"
|
||||
with patch(
|
||||
"esphome.resolver.hr.async_resolve_host",
|
||||
side_effect=ResolveAPIError(error_msg),
|
||||
):
|
||||
resolver = AsyncResolver(["test.local"], 6053)
|
||||
with pytest.raises(
|
||||
EsphomeError, match=re.escape(f"Error resolving IP address: {error_msg}")
|
||||
):
|
||||
resolver.resolve()
|
||||
|
||||
|
||||
def test_async_resolver_timeout_error() -> None:
|
||||
"""Test handling of ResolveTimeoutAPIError."""
|
||||
error_msg = "Resolution timed out"
|
||||
|
||||
with patch(
|
||||
"esphome.resolver.hr.async_resolve_host",
|
||||
side_effect=ResolveTimeoutAPIError(error_msg),
|
||||
):
|
||||
resolver = AsyncResolver(["test.local"], 6053)
|
||||
# Match either "Timeout" or "Error" since ResolveTimeoutAPIError is a subclass of ResolveAPIError
|
||||
# and depending on import order/test execution context, it might be caught as either
|
||||
with pytest.raises(
|
||||
EsphomeError,
|
||||
match=f"(Timeout|Error) resolving IP address: {re.escape(error_msg)}",
|
||||
):
|
||||
resolver.resolve()
|
||||
|
||||
|
||||
def test_async_resolver_generic_exception() -> None:
|
||||
"""Test handling of generic exceptions."""
|
||||
error = RuntimeError("Unexpected error")
|
||||
with patch(
|
||||
"esphome.resolver.hr.async_resolve_host",
|
||||
side_effect=error,
|
||||
):
|
||||
resolver = AsyncResolver(["test.local"], 6053)
|
||||
with pytest.raises(RuntimeError, match="Unexpected error"):
|
||||
resolver.resolve()
|
||||
|
||||
|
||||
def test_async_resolver_thread_timeout() -> None:
|
||||
"""Test timeout when thread doesn't complete in time."""
|
||||
# Mock the start method to prevent actual thread execution
|
||||
with (
|
||||
patch.object(AsyncResolver, "start"),
|
||||
patch("esphome.resolver.hr.async_resolve_host"),
|
||||
):
|
||||
resolver = AsyncResolver(["test.local"], 6053)
|
||||
# Override event.wait to simulate timeout (return False = timeout occurred)
|
||||
with (
|
||||
patch.object(resolver.event, "wait", return_value=False),
|
||||
pytest.raises(
|
||||
EsphomeError, match=re.escape("Timeout resolving IP address")
|
||||
),
|
||||
):
|
||||
resolver.resolve()
|
||||
|
||||
# Verify thread start was called
|
||||
resolver.start.assert_called_once()
|
||||
|
||||
|
||||
def test_async_resolver_ip_addresses(mock_addr_info_ipv4: AddrInfo) -> None:
|
||||
"""Test resolving IP addresses."""
|
||||
with patch(
|
||||
"esphome.resolver.hr.async_resolve_host",
|
||||
return_value=[mock_addr_info_ipv4],
|
||||
) as mock_resolve:
|
||||
resolver = AsyncResolver(["192.168.1.100"], 6053)
|
||||
result = resolver.resolve()
|
||||
|
||||
assert result == [mock_addr_info_ipv4]
|
||||
mock_resolve.assert_called_once_with(
|
||||
["192.168.1.100"], 6053, timeout=RESOLVE_TIMEOUT
|
||||
)
|
||||
|
||||
|
||||
def test_async_resolver_mixed_addresses(
|
||||
mock_addr_info_ipv4: AddrInfo, mock_addr_info_ipv6: AddrInfo
|
||||
) -> None:
|
||||
"""Test resolving mix of hostnames and IP addresses."""
|
||||
mock_results = [mock_addr_info_ipv4, mock_addr_info_ipv6]
|
||||
|
||||
with patch(
|
||||
"esphome.resolver.hr.async_resolve_host",
|
||||
return_value=mock_results,
|
||||
) as mock_resolve:
|
||||
resolver = AsyncResolver(["test.local", "192.168.1.100", "::1"], 6053)
|
||||
result = resolver.resolve()
|
||||
|
||||
assert result == mock_results
|
||||
mock_resolve.assert_called_once_with(
|
||||
["test.local", "192.168.1.100", "::1"], 6053, timeout=RESOLVE_TIMEOUT
|
||||
)
|
Reference in New Issue
Block a user