1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-30 14:43:51 +00:00

Merge branch 'dev' into multi_device

This commit is contained in:
J. Nick Koston
2025-05-29 13:43:21 -05:00
committed by GitHub
414 changed files with 4087 additions and 1333 deletions

View File

@@ -19,6 +19,7 @@ alarm_control_panel:
- input: bin1
bypass_armed_home: true
bypass_armed_night: true
bypass_auto: true
on_state:
then:
- lambda: !lambda |-
@@ -38,6 +39,7 @@ alarm_control_panel:
- input: bin1
bypass_armed_home: true
bypass_armed_night: true
bypass_auto: true
on_disarmed:
then:
- logger.log: "### DISARMED ###"

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO6
sda_pin: GPIO7
<<: !include common.yaml

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO16
sda_pin: GPIO17
<<: !include common.yaml

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO16
sda_pin: GPIO17
<<: !include common.yaml

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO16
sda_pin: GPIO17
<<: !include common.yaml

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO5
sda_pin: GPIO4
<<: !include common.yaml

View File

@@ -0,0 +1,25 @@
esphome:
on_boot:
then:
- audio_dac.mute_off:
- audio_dac.mute_on:
- audio_dac.set_volume:
volume: 50%
i2c:
- id: i2c_es8388
scl: ${scl_pin}
sda: ${sda_pin}
audio_dac:
- platform: es8388
id: es8388_parent
select:
- platform: es8388
es8388_id: es8388_parent
dac_output:
name: "DAC Output"
adc_input_mic:
name: "ADC Input MIC"

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO16
sda_pin: GPIO17
<<: !include common.yaml

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO5
sda_pin: GPIO4
<<: !include common.yaml

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO5
sda_pin: GPIO4
<<: !include common.yaml

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO16
sda_pin: GPIO17
<<: !include common.yaml

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO5
sda_pin: GPIO4
<<: !include common.yaml

View File

@@ -11,6 +11,13 @@ online_image:
format: PNG
type: BINARY
resize: 50x50
on_download_finished:
lambda: |-
if (cached) {
ESP_LOGD("online_image", "Cache hit: using cached image");
} else {
ESP_LOGD("online_image", "Cache miss: fresh download");
}
- id: online_binary_transparent_image
url: http://www.libpng.org/pub/png/img_png/pnglogo-blk-tiny.png
type: BINARY

View File

@@ -6,6 +6,12 @@ i2c:
sx1509:
- id: sx1509_hub
address: 0x3E
keypad:
key_rows: 2
key_columns: 2
keys: abcd
on_key:
- lambda: ESP_LOGD("test", "got key '%c'", x);
binary_sensor:
- platform: gpio
@@ -13,6 +19,11 @@ binary_sensor:
pin:
sx1509: sx1509_hub
number: 3
- platform: sx1509
sx1509_id: sx1509_hub
name: "keypadkey_0"
row: 0
col: 0
switch:
- platform: gpio

View File

@@ -53,6 +53,8 @@ async def dashboard() -> DashboardTestHelper:
assert DASHBOARD.settings.on_ha_addon is True
assert DASHBOARD.settings.using_auth is False
task = asyncio.create_task(DASHBOARD.async_run())
# Wait for initial device loading to complete
await DASHBOARD.entries.async_request_update_entries()
client = AsyncHTTPClient()
io_loop = IOLoop(make_current=False)
yield DashboardTestHelper(io_loop, client, port)

View File

@@ -0,0 +1,80 @@
# ESPHome Integration Tests
This directory contains end-to-end integration tests for ESPHome, focusing on testing the complete flow from YAML configuration to running devices with API connections.
## Structure
- `conftest.py` - Common fixtures and utilities
- `const.py` - Constants used throughout the integration tests
- `types.py` - Type definitions for fixtures and functions
- `fixtures/` - YAML configuration files for tests
- `test_*.py` - Individual test files
## How it works
### Automatic YAML Loading
The `yaml_config` fixture automatically loads YAML configurations based on the test name:
- It looks for a file named after the test function (e.g., `test_host_mode_basic``fixtures/host_mode_basic.yaml`)
- The fixture file must exist or the test will fail with a clear error message
- The fixture automatically injects a dynamic port number into the API configuration
### Key Fixtures
- `run_compiled` - Combines write, compile, and run operations into a single context manager
- `api_client_connected` - Creates an API client that automatically connects using ReconnectLogic
- `reserved_tcp_port` - Reserves a TCP port by holding the socket open until ESPHome needs it
- `unused_tcp_port` - Provides the reserved port number for each test
### Writing Tests
The simplest way to write a test is to use the `run_compiled` and `api_client_connected` fixtures:
```python
@pytest.mark.asyncio
async def test_my_feature(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
# Write, compile and run the ESPHome device, then connect to API
async with run_compiled(yaml_config), api_client_connected() as client:
# Test your feature using the connected client
device_info = await client.device_info()
assert device_info is not None
```
### Creating YAML Fixtures
Create a YAML file in the `fixtures/` directory with the same name as your test function (without the `test_` prefix):
```yaml
# fixtures/my_feature.yaml
esphome:
name: my-test-device
host:
api: # Port will be automatically injected
logger:
# Add your components here
```
## Running Tests
```bash
# Run all integration tests
script/integration_test
# Run a specific test
pytest -vv tests/integration/test_host_mode_basic.py
# Debug compilation errors or see ESPHome output
pytest -s tests/integration/test_host_mode_basic.py
```
## Implementation Details
- Tests automatically wait for the API port to be available before connecting
- Process cleanup is handled automatically, with graceful shutdown using SIGINT
- Each test gets its own temporary directory and unique port
- Port allocation minimizes race conditions by holding the socket until just before ESPHome starts
- Output from ESPHome processes is displayed for debugging

View File

@@ -0,0 +1,3 @@
"""ESPHome integration tests."""
from __future__ import annotations

View File

@@ -0,0 +1,531 @@
"""Common fixtures for integration tests."""
from __future__ import annotations
import asyncio
from collections.abc import AsyncGenerator, Generator
from contextlib import AbstractAsyncContextManager, asynccontextmanager
import logging
import os
from pathlib import Path
import platform
import signal
import socket
import sys
import tempfile
from typing import TextIO
from aioesphomeapi import APIClient, APIConnectionError, ReconnectLogic
import pytest
import pytest_asyncio
import esphome.config
from esphome.core import CORE
from esphome.platformio_api import get_idedata
from .const import (
API_CONNECTION_TIMEOUT,
DEFAULT_API_PORT,
LOCALHOST,
PORT_POLL_INTERVAL,
PORT_WAIT_TIMEOUT,
SIGINT_TIMEOUT,
SIGTERM_TIMEOUT,
)
from .types import (
APIClientConnectedFactory,
APIClientFactory,
CompileFunction,
ConfigWriter,
RunCompiledFunction,
)
# Skip all integration tests on Windows
if platform.system() == "Windows":
pytest.skip(
"Integration tests are not supported on Windows", allow_module_level=True
)
import pty # not available on Windows
@pytest.fixture(scope="module", autouse=True)
def enable_aioesphomeapi_debug_logging():
"""Enable debug logging for aioesphomeapi to help diagnose connection issues."""
# Get the aioesphomeapi logger
logger = logging.getLogger("aioesphomeapi")
# Save the original level
original_level = logger.level
# Set to DEBUG level
logger.setLevel(logging.DEBUG)
# Also ensure we have a handler that outputs to console
if not logger.handlers:
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
yield
# Restore original level
logger.setLevel(original_level)
@pytest.fixture
def integration_test_dir() -> Generator[Path]:
"""Create a temporary directory for integration tests."""
with tempfile.TemporaryDirectory() as tmpdir:
yield Path(tmpdir)
@pytest.fixture
def reserved_tcp_port() -> Generator[tuple[int, socket.socket]]:
"""Reserve an unused TCP port by holding the socket open."""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("", 0))
port = s.getsockname()[1]
try:
yield port, s
finally:
s.close()
@pytest.fixture
def unused_tcp_port(reserved_tcp_port: tuple[int, socket.socket]) -> int:
"""Get the reserved TCP port number."""
return reserved_tcp_port[0]
@pytest_asyncio.fixture
async def yaml_config(request: pytest.FixtureRequest, unused_tcp_port: int) -> str:
"""Load YAML configuration based on test name."""
# Get the test function name
test_name: str = request.node.name
# Extract the base test name (remove test_ prefix and any parametrization)
base_name = test_name.replace("test_", "").partition("[")[0]
# Load the fixture file
fixture_path = Path(__file__).parent / "fixtures" / f"{base_name}.yaml"
if not fixture_path.exists():
raise FileNotFoundError(f"Fixture file not found: {fixture_path}")
loop = asyncio.get_running_loop()
content = await loop.run_in_executor(None, fixture_path.read_text)
# Replace the port in the config if it contains api section
if "api:" in content:
# Add port configuration after api:
content = content.replace("api:", f"api:\n port: {unused_tcp_port}")
return content
@pytest_asyncio.fixture
async def write_yaml_config(
integration_test_dir: Path, request: pytest.FixtureRequest
) -> AsyncGenerator[ConfigWriter]:
"""Write YAML configuration to a file."""
# Get the test name for default filename
test_name = request.node.name
base_name = test_name.replace("test_", "").split("[")[0]
async def _write_config(content: str, filename: str | None = None) -> Path:
if filename is None:
filename = f"{base_name}.yaml"
config_path = integration_test_dir / filename
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, config_path.write_text, content)
return config_path
yield _write_config
@pytest_asyncio.fixture
async def compile_esphome(
integration_test_dir: Path,
) -> AsyncGenerator[CompileFunction]:
"""Compile an ESPHome configuration and return the binary path."""
async def _compile(config_path: Path) -> Path:
# Retry compilation up to 3 times if we get a segfault
max_retries = 3
for attempt in range(max_retries):
# Compile using subprocess, inheriting stdout/stderr to show progress
proc = await asyncio.create_subprocess_exec(
"esphome",
"compile",
str(config_path),
cwd=integration_test_dir,
stdout=None, # Inherit stdout
stderr=None, # Inherit stderr
stdin=asyncio.subprocess.DEVNULL,
# Start in a new process group to isolate signal handling
start_new_session=True,
)
await proc.wait()
if proc.returncode == 0:
# Success!
break
elif proc.returncode == -11 and attempt < max_retries - 1:
# Segfault (-11 = SIGSEGV), retry
print(
f"Compilation segfaulted (attempt {attempt + 1}/{max_retries}), retrying..."
)
await asyncio.sleep(1) # Brief pause before retry
continue
else:
# Other error or final retry
raise RuntimeError(
f"Failed to compile {config_path}, return code: {proc.returncode}. "
f"Run with 'pytest -s' to see compilation output."
)
# Load the config to get idedata (blocking call, must use executor)
loop = asyncio.get_running_loop()
def _read_config_and_get_binary():
CORE.config_path = str(config_path)
config = esphome.config.read_config(
{"command": "compile", "config": str(config_path)}
)
if config is None:
raise RuntimeError(f"Failed to read config from {config_path}")
# Get the compiled binary path
idedata = get_idedata(config)
return Path(idedata.firmware_elf_path)
binary_path = await loop.run_in_executor(None, _read_config_and_get_binary)
if not binary_path.exists():
raise RuntimeError(f"Compiled binary not found at {binary_path}")
return binary_path
yield _compile
@asynccontextmanager
async def create_api_client(
address: str = LOCALHOST,
port: int = DEFAULT_API_PORT,
password: str = "",
noise_psk: str | None = None,
client_info: str = "integration-test",
) -> AsyncGenerator[APIClient]:
"""Create an API client context manager."""
client = APIClient(
address=address,
port=port,
password=password,
noise_psk=noise_psk,
client_info=client_info,
)
try:
yield client
finally:
await client.disconnect()
@pytest_asyncio.fixture
async def api_client_factory(
unused_tcp_port: int,
) -> AsyncGenerator[APIClientFactory]:
"""Factory for creating API client context managers."""
def _create_client(
address: str = LOCALHOST,
port: int | None = None,
password: str = "",
noise_psk: str | None = None,
client_info: str = "integration-test",
) -> AbstractAsyncContextManager[APIClient]:
return create_api_client(
address=address,
port=port if port is not None else unused_tcp_port,
password=password,
noise_psk=noise_psk,
client_info=client_info,
)
yield _create_client
@asynccontextmanager
async def wait_and_connect_api_client(
address: str = LOCALHOST,
port: int = DEFAULT_API_PORT,
password: str = "",
noise_psk: str | None = None,
client_info: str = "integration-test",
timeout: float = API_CONNECTION_TIMEOUT,
) -> AsyncGenerator[APIClient]:
"""Wait for API to be available and connect."""
client = APIClient(
address=address,
port=port,
password=password,
noise_psk=noise_psk,
client_info=client_info,
)
# Create a future to signal when connected
loop = asyncio.get_running_loop()
connected_future: asyncio.Future[None] = loop.create_future()
async def on_connect() -> None:
"""Called when successfully connected."""
if not connected_future.done():
connected_future.set_result(None)
async def on_disconnect(expected_disconnect: bool) -> None:
"""Called when disconnected."""
if not connected_future.done() and not expected_disconnect:
connected_future.set_exception(
APIConnectionError("Disconnected before fully connected")
)
async def on_connect_error(err: Exception) -> None:
"""Called when connection fails."""
if not connected_future.done():
connected_future.set_exception(err)
# Create and start the reconnect logic
reconnect_logic = ReconnectLogic(
client=client,
on_connect=on_connect,
on_disconnect=on_disconnect,
zeroconf_instance=None, # Not using zeroconf for integration tests
name=f"{address}:{port}",
on_connect_error=on_connect_error,
)
try:
# Start the connection
await reconnect_logic.start()
# Wait for connection with timeout
try:
await asyncio.wait_for(connected_future, timeout=timeout)
except asyncio.TimeoutError:
raise TimeoutError(f"Failed to connect to API after {timeout} seconds")
yield client
finally:
# Stop reconnect logic and disconnect
await reconnect_logic.stop()
await client.disconnect()
@pytest_asyncio.fixture
async def api_client_connected(
unused_tcp_port: int,
) -> AsyncGenerator[APIClientConnectedFactory]:
"""Factory for creating connected API client context managers."""
def _connect_client(
address: str = LOCALHOST,
port: int | None = None,
password: str = "",
noise_psk: str | None = None,
client_info: str = "integration-test",
timeout: float = API_CONNECTION_TIMEOUT,
) -> AbstractAsyncContextManager[APIClient]:
return wait_and_connect_api_client(
address=address,
port=port if port is not None else unused_tcp_port,
password=password,
noise_psk=noise_psk,
client_info=client_info,
timeout=timeout,
)
yield _connect_client
async def _read_stream_lines(
stream: asyncio.StreamReader, lines: list[str], output_stream: TextIO
) -> None:
"""Read lines from a stream, append to list, and echo to output stream."""
while line := await stream.readline():
decoded_line = line.decode("utf-8", errors="replace")
lines.append(decoded_line.rstrip())
# Echo to stdout/stderr in real-time
print(decoded_line.rstrip(), file=output_stream, flush=True)
@asynccontextmanager
async def run_binary_and_wait_for_port(
binary_path: Path,
host: str,
port: int,
timeout: float = PORT_WAIT_TIMEOUT,
) -> AsyncGenerator[None]:
"""Run a binary, wait for it to open a port, and clean up on exit."""
# Create a pseudo-terminal to make the binary think it's running interactively
# This is needed because the ESPHome host logger checks isatty()
controller_fd, device_fd = pty.openpty()
# Run the compiled binary with PTY
process = await asyncio.create_subprocess_exec(
str(binary_path),
stdout=device_fd,
stderr=device_fd,
stdin=asyncio.subprocess.DEVNULL,
# Start in a new process group to isolate signal handling
start_new_session=True,
pass_fds=(device_fd,),
)
# Close the device end in the parent process
os.close(device_fd)
# Convert controller_fd to async streams for reading
loop = asyncio.get_running_loop()
controller_reader = asyncio.StreamReader()
controller_protocol = asyncio.StreamReaderProtocol(controller_reader)
controller_transport, _ = await loop.connect_read_pipe(
lambda: controller_protocol, os.fdopen(controller_fd, "rb", 0)
)
output_reader = controller_reader
if process.returncode is not None:
raise RuntimeError(
f"Process died immediately with return code {process.returncode}. "
"Ensure the binary is valid and can run successfully."
)
# Wait for the API server to start listening
loop = asyncio.get_running_loop()
start_time = loop.time()
# Start collecting output
stdout_lines: list[str] = []
output_tasks: list[asyncio.Task] = []
try:
# Read from output stream
output_tasks = [
asyncio.create_task(
_read_stream_lines(output_reader, stdout_lines, sys.stdout)
)
]
# Small yield to ensure the process has a chance to start
await asyncio.sleep(0)
while loop.time() - start_time < timeout:
try:
# Try to connect to the port
_, writer = await asyncio.open_connection(host, port)
writer.close()
await writer.wait_closed()
# Port is open, yield control
yield
return
except (ConnectionRefusedError, OSError):
# Check if process died
if process.returncode is not None:
break
# Port not open yet, wait a bit and try again
await asyncio.sleep(PORT_POLL_INTERVAL)
# Timeout or process died - build error message
error_msg = f"Port {port} on {host} did not open within {timeout} seconds"
if process.returncode is not None:
error_msg += f"\nProcess exited with code: {process.returncode}"
# Include any output collected so far
if stdout_lines:
error_msg += "\n\n--- Process Output ---\n"
error_msg += "\n".join(stdout_lines[-100:]) # Last 100 lines
raise TimeoutError(error_msg)
finally:
# Cancel output collection tasks
for task in output_tasks:
task.cancel()
# Wait for tasks to complete and check for exceptions
results = await asyncio.gather(*output_tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception) and not isinstance(
result, asyncio.CancelledError
):
print(
f"Error reading from PTY: {result}",
file=sys.stderr,
)
# Close the PTY transport (Unix only)
if controller_transport is not None:
controller_transport.close()
# Cleanup: terminate the process gracefully
if process.returncode is None:
# Send SIGINT (Ctrl+C) for graceful shutdown
process.send_signal(signal.SIGINT)
try:
await asyncio.wait_for(process.wait(), timeout=SIGINT_TIMEOUT)
except asyncio.TimeoutError:
# If SIGINT didn't work, try SIGTERM
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=SIGTERM_TIMEOUT)
except asyncio.TimeoutError:
# Last resort: SIGKILL
process.kill()
await process.wait()
@asynccontextmanager
async def run_compiled_context(
yaml_content: str,
filename: str | None,
write_yaml_config: ConfigWriter,
compile_esphome: CompileFunction,
port: int,
port_socket: socket.socket | None = None,
) -> AsyncGenerator[None]:
"""Context manager to write, compile and run an ESPHome configuration."""
# Write the YAML config
config_path = await write_yaml_config(yaml_content, filename)
# Compile the configuration and get binary path
binary_path = await compile_esphome(config_path)
# Close the port socket right before running to release the port
if port_socket is not None:
port_socket.close()
# Run the binary and wait for the API server to start
async with run_binary_and_wait_for_port(binary_path, LOCALHOST, port):
yield
@pytest_asyncio.fixture
async def run_compiled(
write_yaml_config: ConfigWriter,
compile_esphome: CompileFunction,
reserved_tcp_port: tuple[int, socket.socket],
) -> AsyncGenerator[RunCompiledFunction]:
"""Write, compile and run an ESPHome configuration."""
port, port_socket = reserved_tcp_port
def _run_compiled(
yaml_content: str, filename: str | None = None
) -> AbstractAsyncContextManager[asyncio.subprocess.Process]:
return run_compiled_context(
yaml_content,
filename,
write_yaml_config,
compile_esphome,
port,
port_socket,
)
yield _run_compiled

View File

@@ -0,0 +1,14 @@
"""Constants for integration tests."""
# Network constants
DEFAULT_API_PORT = 6053
LOCALHOST = "127.0.0.1"
# Timeout constants
API_CONNECTION_TIMEOUT = 30.0 # seconds
PORT_WAIT_TIMEOUT = 30.0 # seconds
PORT_POLL_INTERVAL = 0.1 # seconds
# Process shutdown timeouts
SIGINT_TIMEOUT = 5.0 # seconds
SIGTERM_TIMEOUT = 2.0 # seconds

View File

@@ -0,0 +1,5 @@
esphome:
name: host-test
host:
api:
logger:

View File

@@ -0,0 +1,7 @@
esphome:
name: host-noise-test
host:
api:
encryption:
key: N4Yle5YirwZhPiHHsdZLdOA73ndj/84veVaLhTvxCuU=
logger:

View File

@@ -0,0 +1,7 @@
esphome:
name: host-noise-test
host:
api:
encryption:
key: N4Yle5YirwZhPiHHsdZLdOA73ndj/84veVaLhTvxCuU=
logger:

View File

@@ -0,0 +1,5 @@
esphome:
name: host-reconnect-test
host:
api:
logger:

View File

@@ -0,0 +1,12 @@
esphome:
name: host-sensor-test
host:
api:
logger:
sensor:
- platform: template
name: Test Sensor
id: test_sensor
unit_of_measurement: °C
lambda: return 42.0;
update_interval: 0.1s

View File

@@ -0,0 +1,22 @@
"""Basic integration test for Host mode."""
from __future__ import annotations
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_host_mode_basic(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test basic Host mode functionality with API connection."""
# Write, compile and run the ESPHome device, then connect to API
async with run_compiled(yaml_config), api_client_connected() as client:
# Verify we can get device info
device_info = await client.device_info()
assert device_info is not None
assert device_info.name == "host-test"

View File

@@ -0,0 +1,53 @@
"""Integration test for Host mode with noise encryption."""
from __future__ import annotations
from aioesphomeapi import InvalidEncryptionKeyAPIError
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
# The API key for noise encryption
NOISE_KEY = "N4Yle5YirwZhPiHHsdZLdOA73ndj/84veVaLhTvxCuU="
@pytest.mark.asyncio
async def test_host_mode_noise_encryption(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test Host mode with noise encryption enabled."""
# Write, compile and run the ESPHome device, then connect to API
# The API client should handle noise encryption automatically
async with (
run_compiled(yaml_config),
api_client_connected(noise_psk=NOISE_KEY) as client,
):
# If we can get device info, the encryption is working
device_info = await client.device_info()
assert device_info is not None
assert device_info.name == "host-noise-test"
# List entities to ensure the encrypted connection is fully functional
entities = await client.list_entities_services()
assert entities is not None
@pytest.mark.asyncio
async def test_host_mode_noise_encryption_wrong_key(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test that connection fails with wrong encryption key."""
# Write, compile and run the ESPHome device
async with run_compiled(yaml_config):
# Try to connect with wrong key - should fail with InvalidEncryptionKeyAPIError
with pytest.raises(InvalidEncryptionKeyAPIError):
async with api_client_connected(
noise_psk="wrong_key_that_should_not_work",
timeout=5, # Shorter timeout for expected failure
) as client:
# This should not be reached
await client.device_info()

View File

@@ -0,0 +1,28 @@
"""Integration test for Host mode reconnection."""
from __future__ import annotations
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_host_mode_reconnect(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test reconnecting to a Host mode device."""
# Write, compile and run the ESPHome device
async with run_compiled(yaml_config):
# First connection
async with api_client_connected() as client:
device_info = await client.device_info()
assert device_info is not None
# Reconnect with a new client
async with api_client_connected() as client2:
device_info2 = await client2.device_info()
assert device_info2 is not None
assert device_info2.name == device_info.name

View File

@@ -0,0 +1,49 @@
"""Integration test for Host mode with sensor."""
from __future__ import annotations
import asyncio
from aioesphomeapi import EntityState
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_host_mode_with_sensor(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test Host mode with a sensor component."""
# Write, compile and run the ESPHome device, then connect to API
async with run_compiled(yaml_config), api_client_connected() as client:
# Subscribe to state changes
states: dict[int, EntityState] = {}
sensor_future: asyncio.Future[EntityState] = asyncio.Future()
def on_state(state: EntityState) -> None:
states[state.key] = state
# If this is our sensor with value 42.0, resolve the future
if (
hasattr(state, "state")
and state.state == 42.0
and not sensor_future.done()
):
sensor_future.set_result(state)
client.subscribe_states(on_state)
# Wait for sensor with specific value (42.0) with timeout
try:
test_sensor_state = await asyncio.wait_for(sensor_future, timeout=5.0)
except asyncio.TimeoutError:
pytest.fail(
f"Sensor with value 42.0 not received within 5 seconds. "
f"Received states: {list(states.values())}"
)
# Verify the sensor state
assert test_sensor_state.state == 42.0
assert len(states) > 0, "No states received"

View File

@@ -0,0 +1,44 @@
"""Type definitions for integration tests."""
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Callable
from contextlib import AbstractAsyncContextManager
from pathlib import Path
from typing import Protocol
from aioesphomeapi import APIClient
ConfigWriter = Callable[[str, str | None], Awaitable[Path]]
CompileFunction = Callable[[Path], Awaitable[Path]]
RunFunction = Callable[[Path], Awaitable[asyncio.subprocess.Process]]
RunCompiledFunction = Callable[[str, str | None], AbstractAsyncContextManager[None]]
WaitFunction = Callable[[APIClient, float], Awaitable[bool]]
class APIClientFactory(Protocol):
"""Protocol for API client factory."""
def __call__( # noqa: E704
self,
address: str = "localhost",
port: int | None = None,
password: str = "",
noise_psk: str | None = None,
client_info: str = "integration-test",
) -> AbstractAsyncContextManager[APIClient]: ...
class APIClientConnectedFactory(Protocol):
"""Protocol for connected API client factory."""
def __call__( # noqa: E704
self,
address: str = "localhost",
port: int | None = None,
password: str = "",
noise_psk: str | None = None,
client_info: str = "integration-test",
timeout: float = 30,
) -> AbstractAsyncContextManager[APIClient]: ...