"""Integration test for API conditional memory optimization with triggers and services.""" from __future__ import annotations import asyncio from aioesphomeapi import ( BinarySensorInfo, EntityState, SensorInfo, TextSensorInfo, UserService, UserServiceArgType, ) import pytest from .types import APIClientConnectedFactory, RunCompiledFunction @pytest.mark.asyncio async def test_api_conditional_memory( yaml_config: str, run_compiled: RunCompiledFunction, api_client_connected: APIClientConnectedFactory, ) -> None: """Test API triggers and services work correctly with conditional compilation.""" loop = asyncio.get_running_loop() # Keep ESPHome process running throughout the test async with run_compiled(yaml_config): # First connection async with api_client_connected() as client: # Verify device info device_info = await client.device_info() assert device_info is not None assert device_info.name == "api-conditional-memory-test" # List entities and services entity_info, services = await asyncio.wait_for( client.list_entities_services(), timeout=5.0 ) # Find our entities client_connected: BinarySensorInfo | None = None client_disconnected_event: BinarySensorInfo | None = None service_called_sensor: BinarySensorInfo | None = None service_arg_sensor: SensorInfo | None = None last_client_info: TextSensorInfo | None = None for entity in entity_info: if isinstance(entity, BinarySensorInfo): if entity.object_id == "client_connected": client_connected = entity elif entity.object_id == "client_disconnected_event": client_disconnected_event = entity elif entity.object_id == "service_called": service_called_sensor = entity elif isinstance(entity, SensorInfo): if entity.object_id == "service_argument_value": service_arg_sensor = entity elif isinstance(entity, TextSensorInfo): if entity.object_id == "last_client_info": last_client_info = entity # Verify all entities exist assert client_connected is not None, "client_connected sensor not found" assert client_disconnected_event is not None, ( "client_disconnected_event sensor not found" ) assert service_called_sensor is not None, "service_called sensor not found" assert service_arg_sensor is not None, "service_arg_sensor not found" assert last_client_info is not None, "last_client_info sensor not found" # Verify services exist assert len(services) == 2, f"Expected 2 services, found {len(services)}" # Find our services simple_service: UserService | None = None service_with_args: UserService | None = None for service in services: if service.name == "test_simple_service": simple_service = service elif service.name == "test_service_with_args": service_with_args = service assert simple_service is not None, "test_simple_service not found" assert service_with_args is not None, "test_service_with_args not found" # Verify service arguments assert len(service_with_args.args) == 4, ( f"Expected 4 args, found {len(service_with_args.args)}" ) # Check arg types arg_types = {arg.name: arg.type for arg in service_with_args.args} assert arg_types["arg_string"] == UserServiceArgType.STRING assert arg_types["arg_int"] == UserServiceArgType.INT assert arg_types["arg_bool"] == UserServiceArgType.BOOL assert arg_types["arg_float"] == UserServiceArgType.FLOAT # Track state changes states: dict[int, EntityState] = {} states_future: asyncio.Future[None] = loop.create_future() def on_state(state: EntityState) -> None: states[state.key] = state # Check if we have initial states for connection sensors if ( client_connected.key in states and last_client_info.key in states and not states_future.done() ): states_future.set_result(None) client.subscribe_states(on_state) # Wait for initial states await asyncio.wait_for(states_future, timeout=5.0) # Verify on_client_connected trigger fired connected_state = states.get(client_connected.key) assert connected_state is not None assert connected_state.state is True, "Client should be connected" # Verify client info was captured client_info_state = states.get(last_client_info.key) assert client_info_state is not None assert isinstance(client_info_state.state, str) assert len(client_info_state.state) > 0, "Client info should not be empty" # Test simple service service_future: asyncio.Future[None] = loop.create_future() def check_service_called(state: EntityState) -> None: if state.key == service_called_sensor.key and state.state is True: if not service_future.done(): service_future.set_result(None) # Update callback to check for service execution client.subscribe_states(check_service_called) # Call simple service client.execute_service(simple_service, {}) # Wait for service to execute await asyncio.wait_for(service_future, timeout=5.0) # Test service with arguments arg_future: asyncio.Future[None] = loop.create_future() expected_float = 42.5 def check_arg_sensor(state: EntityState) -> None: if ( state.key == service_arg_sensor.key and abs(state.state - expected_float) < 0.01 ): if not arg_future.done(): arg_future.set_result(None) client.subscribe_states(check_arg_sensor) # Call service with arguments client.execute_service( service_with_args, { "arg_string": "test_string", "arg_int": 123, "arg_bool": True, "arg_float": expected_float, }, ) # Wait for service with args to execute await asyncio.wait_for(arg_future, timeout=5.0) # After disconnecting first client, reconnect and verify triggers work async with api_client_connected() as client2: # Subscribe to states with new client states2: dict[int, EntityState] = {} connected_future: asyncio.Future[None] = loop.create_future() def on_state2(state: EntityState) -> None: states2[state.key] = state # Check for reconnection if state.key == client_connected.key and state.state is True: if not connected_future.done(): connected_future.set_result(None) client2.subscribe_states(on_state2) # Wait for connected state await asyncio.wait_for(connected_future, timeout=5.0) # Verify client is connected again (on_client_connected fired) assert states2[client_connected.key].state is True, ( "Client should be reconnected" ) # The client_disconnected_event should be ON from when we disconnected # (it was set ON by on_client_disconnected trigger) disconnected_state = states2.get(client_disconnected_event.key) assert disconnected_state is not None assert disconnected_state.state is True, ( "Disconnect event should be ON from previous disconnect" )