1
0
mirror of https://github.com/esphome/esphome.git synced 2025-11-15 14:25:45 +00:00

Compare commits

..

3 Commits

Author SHA1 Message Date
J. Nick Koston
e8f2e91db3 [sntp] Merge multiple instances to fix crash and undefined behavior 2025-11-14 08:47:15 -06:00
dependabot[bot]
67524e14ee Bump pylint from 4.0.2 to 4.0.3 (#11894)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-11-13 19:05:02 +00:00
Edward Firmo
2290eb0dd2 [light] Fix missing ColorMode::BRIGHTNESS case in logging (#11836) 2025-11-13 12:08:06 -06:00
11 changed files with 354 additions and 300 deletions

View File

@@ -52,8 +52,10 @@ static void log_invalid_parameter(const char *name, const LogString *message) {
}
static const LogString *color_mode_to_human(ColorMode color_mode) {
if (color_mode == ColorMode::UNKNOWN)
return LOG_STR("Unknown");
if (color_mode == ColorMode::ON_OFF)
return LOG_STR("On/Off");
if (color_mode == ColorMode::BRIGHTNESS)
return LOG_STR("Brightness");
if (color_mode == ColorMode::WHITE)
return LOG_STR("White");
if (color_mode == ColorMode::COLOR_TEMPERATURE)
@@ -68,7 +70,7 @@ static const LogString *color_mode_to_human(ColorMode color_mode) {
return LOG_STR("RGB + cold/warm white");
if (color_mode == ColorMode::RGB_COLOR_TEMPERATURE)
return LOG_STR("RGB + color temperature");
return LOG_STR("");
return LOG_STR("Unknown");
}
// Helper to log percentage values

View File

@@ -1,9 +1,14 @@
import logging
import esphome.codegen as cg
from esphome.components import time as time_
from esphome.config_helpers import merge_config
import esphome.config_validation as cv
from esphome.const import (
CONF_ID,
CONF_PLATFORM,
CONF_SERVERS,
CONF_TIME,
PLATFORM_BK72XX,
PLATFORM_ESP32,
PLATFORM_ESP8266,
@@ -12,13 +17,74 @@ from esphome.const import (
PLATFORM_RTL87XX,
)
from esphome.core import CORE
import esphome.final_validate as fv
from esphome.types import ConfigType
_LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ["network"]
CONF_SNTP = "sntp"
sntp_ns = cg.esphome_ns.namespace("sntp")
SNTPComponent = sntp_ns.class_("SNTPComponent", time_.RealTimeClock)
DEFAULT_SERVERS = ["0.pool.ntp.org", "1.pool.ntp.org", "2.pool.ntp.org"]
def _sntp_final_validate(config: ConfigType) -> None:
"""Merge multiple SNTP instances into one, similar to OTA merging behavior."""
full_conf = fv.full_config.get()
time_confs = full_conf.get(CONF_TIME, [])
sntp_configs: list[ConfigType] = []
other_time_configs: list[ConfigType] = []
for time_conf in time_confs:
if time_conf.get(CONF_PLATFORM) == CONF_SNTP:
sntp_configs.append(time_conf)
else:
other_time_configs.append(time_conf)
if len(sntp_configs) <= 1:
return
# Merge all SNTP configs into the first one
merged = sntp_configs[0]
for sntp_conf in sntp_configs[1:]:
# Validate that IDs are consistent if manually specified
if merged[CONF_ID].is_manual and sntp_conf[CONF_ID].is_manual:
raise cv.Invalid(
f"Found multiple SNTP configurations but {CONF_ID} is inconsistent"
)
merged = merge_config(merged, sntp_conf)
# Deduplicate servers while preserving order
servers = merged[CONF_SERVERS]
unique_servers = list(dict.fromkeys(servers))
# Warn if we're dropping servers due to 3-server limit
if len(unique_servers) > 3:
dropped = unique_servers[3:]
unique_servers = unique_servers[:3]
_LOGGER.warning(
"SNTP supports maximum 3 servers. Dropped excess server(s): %s",
dropped,
)
merged[CONF_SERVERS] = unique_servers
_LOGGER.warning(
"Found and merged %d SNTP time configurations into one instance",
len(sntp_configs),
)
# Replace time configs with merged SNTP + other time platforms
other_time_configs.append(merged)
full_conf[CONF_TIME] = other_time_configs
fv.full_config.set(full_conf)
CONFIG_SCHEMA = cv.All(
time_.TIME_SCHEMA.extend(
{
@@ -40,6 +106,8 @@ CONFIG_SCHEMA = cv.All(
),
)
FINAL_VALIDATE_SCHEMA = _sntp_final_validate
async def to_code(config):
servers = config[CONF_SERVERS]

View File

@@ -137,11 +137,7 @@ async def to_code(config):
cg.add(var.set_arming_night_time(config[CONF_ARMING_NIGHT_TIME]))
supports_arm_night = True
if sensors := config.get(CONF_BINARY_SENSORS, []):
# Initialize FixedVector with the exact number of sensors
cg.add(var.init_sensors(len(sensors)))
for sensor in sensors:
for sensor in config.get(CONF_BINARY_SENSORS, []):
bs = await cg.get_variable(sensor[CONF_INPUT])
flags = BinarySensorFlags[FLAG_NORMAL]

View File

@@ -20,13 +20,10 @@ void TemplateAlarmControlPanel::add_sensor(binary_sensor::BinarySensor *sensor,
// Save the flags and type. Assign a store index for the per sensor data type.
SensorDataStore sd;
sd.last_chime_state = false;
AlarmSensor alarm_sensor;
alarm_sensor.sensor = sensor;
alarm_sensor.info.flags = flags;
alarm_sensor.info.type = type;
alarm_sensor.info.store_index = this->next_store_index_++;
this->sensors_.push_back(alarm_sensor);
this->sensor_map_[sensor].flags = flags;
this->sensor_map_[sensor].type = type;
this->sensor_data_.push_back(sd);
this->sensor_map_[sensor].store_index = this->next_store_index_++;
};
static const LogString *sensor_type_to_string(AlarmSensorType type) {
@@ -48,7 +45,7 @@ void TemplateAlarmControlPanel::dump_config() {
ESP_LOGCONFIG(TAG,
"TemplateAlarmControlPanel:\n"
" Current State: %s\n"
" Number of Codes: %zu\n"
" Number of Codes: %u\n"
" Requires Code To Arm: %s\n"
" Arming Away Time: %" PRIu32 "s\n"
" Arming Home Time: %" PRIu32 "s\n"
@@ -61,8 +58,7 @@ void TemplateAlarmControlPanel::dump_config() {
(this->arming_home_time_ / 1000), (this->arming_night_time_ / 1000), (this->pending_time_ / 1000),
(this->trigger_time_ / 1000), this->get_supported_features());
#ifdef USE_BINARY_SENSOR
for (const auto &alarm_sensor : this->sensors_) {
const uint16_t flags = alarm_sensor.info.flags;
for (auto const &[sensor, info] : this->sensor_map_) {
ESP_LOGCONFIG(TAG,
" Binary Sensor:\n"
" Name: %s\n"
@@ -71,10 +67,11 @@ void TemplateAlarmControlPanel::dump_config() {
" Armed night bypass: %s\n"
" Auto bypass: %s\n"
" Chime mode: %s",
alarm_sensor.sensor->get_name().c_str(), LOG_STR_ARG(sensor_type_to_string(alarm_sensor.info.type)),
TRUEFALSE(flags & BINARY_SENSOR_MODE_BYPASS_ARMED_HOME),
TRUEFALSE(flags & BINARY_SENSOR_MODE_BYPASS_ARMED_NIGHT),
TRUEFALSE(flags & BINARY_SENSOR_MODE_BYPASS_AUTO), TRUEFALSE(flags & BINARY_SENSOR_MODE_CHIME));
sensor->get_name().c_str(), LOG_STR_ARG(sensor_type_to_string(info.type)),
TRUEFALSE(info.flags & BINARY_SENSOR_MODE_BYPASS_ARMED_HOME),
TRUEFALSE(info.flags & BINARY_SENSOR_MODE_BYPASS_ARMED_NIGHT),
TRUEFALSE(info.flags & BINARY_SENSOR_MODE_BYPASS_AUTO),
TRUEFALSE(info.flags & BINARY_SENSOR_MODE_CHIME));
}
#endif
}
@@ -124,9 +121,7 @@ void TemplateAlarmControlPanel::loop() {
#ifdef USE_BINARY_SENSOR
// Test all of the sensors regardless of the alarm panel state
for (const auto &alarm_sensor : this->sensors_) {
const auto &info = alarm_sensor.info;
auto *sensor = alarm_sensor.sensor;
for (auto const &[sensor, info] : this->sensor_map_) {
// Check for chime zones
if (info.flags & BINARY_SENSOR_MODE_CHIME) {
// Look for the transition from closed to open
@@ -247,11 +242,11 @@ void TemplateAlarmControlPanel::arm_(optional<std::string> code, alarm_control_p
void TemplateAlarmControlPanel::bypass_before_arming() {
#ifdef USE_BINARY_SENSOR
for (const auto &alarm_sensor : this->sensors_) {
for (auto const &[sensor, info] : this->sensor_map_) {
// Check for faulted bypass_auto sensors and remove them from monitoring
if ((alarm_sensor.info.flags & BINARY_SENSOR_MODE_BYPASS_AUTO) && (alarm_sensor.sensor->state)) {
ESP_LOGW(TAG, "'%s' is faulted and will be automatically bypassed", alarm_sensor.sensor->get_name().c_str());
this->bypassed_sensor_indicies_.push_back(alarm_sensor.info.store_index);
if ((info.flags & BINARY_SENSOR_MODE_BYPASS_AUTO) && (sensor->state)) {
ESP_LOGW(TAG, "'%s' is faulted and will be automatically bypassed", sensor->get_name().c_str());
this->bypassed_sensor_indicies_.push_back(info.store_index);
}
}
#endif

View File

@@ -1,12 +1,11 @@
#pragma once
#include <cinttypes>
#include <vector>
#include <map>
#include "esphome/core/automation.h"
#include "esphome/core/component.h"
#include "esphome/core/defines.h"
#include "esphome/core/helpers.h"
#include "esphome/components/alarm_control_panel/alarm_control_panel.h"
@@ -50,13 +49,6 @@ struct SensorInfo {
uint8_t store_index;
};
#ifdef USE_BINARY_SENSOR
struct AlarmSensor {
binary_sensor::BinarySensor *sensor;
SensorInfo info;
};
#endif
class TemplateAlarmControlPanel final : public alarm_control_panel::AlarmControlPanel, public Component {
public:
TemplateAlarmControlPanel();
@@ -71,12 +63,6 @@ class TemplateAlarmControlPanel final : public alarm_control_panel::AlarmControl
void bypass_before_arming();
#ifdef USE_BINARY_SENSOR
/** Initialize the sensors vector with the specified capacity.
*
* @param capacity The number of sensors to allocate space for.
*/
void init_sensors(size_t capacity) { this->sensors_.init(capacity); }
/** Add a binary_sensor to the alarm_panel.
*
* @param sensor The BinarySensor instance.
@@ -136,8 +122,8 @@ class TemplateAlarmControlPanel final : public alarm_control_panel::AlarmControl
protected:
void control(const alarm_control_panel::AlarmControlPanelCall &call) override;
#ifdef USE_BINARY_SENSOR
// List of binary sensors with their alarm-specific info
FixedVector<AlarmSensor> sensors_;
// This maps a binary sensor to its alarm specific info
std::map<binary_sensor::BinarySensor *, SensorInfo> sensor_map_;
// a list of automatically bypassed sensors
std::vector<uint8_t> bypassed_sensor_indicies_;
#endif

View File

@@ -1,4 +1,4 @@
pylint==4.0.2
pylint==4.0.3
flake8==7.3.0 # also change in .pre-commit-config.yaml when updating
ruff==0.14.4 # also change in .pre-commit-config.yaml when updating
pyupgrade==3.21.1 # also change in .pre-commit-config.yaml when updating

View File

@@ -0,0 +1 @@
"""Tests for SNTP component."""

View File

@@ -0,0 +1,22 @@
esphome:
name: sntp-test
esp32:
board: esp32dev
framework:
type: esp-idf
wifi:
ssid: "testssid"
password: "testpassword"
# Test multiple SNTP instances that should be merged
time:
- platform: sntp
servers:
- 192.168.1.1
- pool.ntp.org
- platform: sntp
servers:
- pool.ntp.org
- 192.168.1.2

View File

@@ -0,0 +1,238 @@
"""Tests for SNTP time configuration validation."""
from __future__ import annotations
import logging
from typing import Any
import pytest
from esphome import config_validation as cv
from esphome.components.sntp.time import CONF_SNTP, _sntp_final_validate
from esphome.const import CONF_ID, CONF_PLATFORM, CONF_SERVERS, CONF_TIME
from esphome.core import ID
import esphome.final_validate as fv
@pytest.mark.parametrize(
("time_configs", "expected_count", "expected_servers", "warning_messages"),
[
pytest.param(
[
{
CONF_PLATFORM: CONF_SNTP,
CONF_ID: ID("sntp_time", is_manual=False),
CONF_SERVERS: ["192.168.1.1", "pool.ntp.org"],
}
],
1,
["192.168.1.1", "pool.ntp.org"],
[],
id="single_instance_no_merge",
),
pytest.param(
[
{
CONF_PLATFORM: CONF_SNTP,
CONF_ID: ID("sntp_time_1", is_manual=False),
CONF_SERVERS: ["192.168.1.1", "pool.ntp.org"],
},
{
CONF_PLATFORM: CONF_SNTP,
CONF_ID: ID("sntp_time_2", is_manual=False),
CONF_SERVERS: ["192.168.1.2"],
},
],
1,
["192.168.1.1", "pool.ntp.org", "192.168.1.2"],
["Found and merged 2 SNTP time configurations into one instance"],
id="two_instances_merged",
),
pytest.param(
[
{
CONF_PLATFORM: CONF_SNTP,
CONF_ID: ID("sntp_time_1", is_manual=False),
CONF_SERVERS: ["192.168.1.1", "pool.ntp.org"],
},
{
CONF_PLATFORM: CONF_SNTP,
CONF_ID: ID("sntp_time_2", is_manual=False),
CONF_SERVERS: ["pool.ntp.org", "192.168.1.2"],
},
],
1,
["192.168.1.1", "pool.ntp.org", "192.168.1.2"],
["Found and merged 2 SNTP time configurations into one instance"],
id="deduplication_preserves_order",
),
pytest.param(
[
{
CONF_PLATFORM: CONF_SNTP,
CONF_ID: ID("sntp_time_1", is_manual=False),
CONF_SERVERS: ["192.168.1.1", "pool.ntp.org"],
},
{
CONF_PLATFORM: CONF_SNTP,
CONF_ID: ID("sntp_time_2", is_manual=False),
CONF_SERVERS: ["192.168.1.2", "pool2.ntp.org"],
},
{
CONF_PLATFORM: CONF_SNTP,
CONF_ID: ID("sntp_time_3", is_manual=False),
CONF_SERVERS: ["pool3.ntp.org"],
},
],
1,
["192.168.1.1", "pool.ntp.org", "192.168.1.2"],
[
"SNTP supports maximum 3 servers. Dropped excess server(s): ['pool2.ntp.org', 'pool3.ntp.org']",
"Found and merged 3 SNTP time configurations into one instance",
],
id="three_instances_drops_excess_servers",
),
pytest.param(
[
{
CONF_PLATFORM: CONF_SNTP,
CONF_ID: ID("sntp_time_1", is_manual=False),
CONF_SERVERS: [
"192.168.1.1",
"pool.ntp.org",
"pool.ntp.org",
"192.168.1.1",
],
},
{
CONF_PLATFORM: CONF_SNTP,
CONF_ID: ID("sntp_time_2", is_manual=False),
CONF_SERVERS: ["pool.ntp.org", "192.168.1.2"],
},
],
1,
["192.168.1.1", "pool.ntp.org", "192.168.1.2"],
["Found and merged 2 SNTP time configurations into one instance"],
id="deduplication_multiple_duplicates",
),
],
)
def test_sntp_instance_merging(
time_configs: list[dict[str, Any]],
expected_count: int,
expected_servers: list[str],
warning_messages: list[str],
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test SNTP instance merging behavior."""
# Create a mock full config with time configs
full_conf = {CONF_TIME: time_configs.copy()}
# Set the context var
token = fv.full_config.set(full_conf)
try:
with caplog.at_level(logging.WARNING):
_sntp_final_validate({})
# Get the updated config
updated_conf = fv.full_config.get()
# Check if merging occurred
if len(time_configs) > 1:
# Verify only one SNTP instance remains
sntp_instances = [
tc
for tc in updated_conf[CONF_TIME]
if tc.get(CONF_PLATFORM) == CONF_SNTP
]
assert len(sntp_instances) == expected_count
# Verify server list
assert sntp_instances[0][CONF_SERVERS] == expected_servers
# Verify warnings
for expected_msg in warning_messages:
assert any(
expected_msg in record.message for record in caplog.records
), f"Expected warning message '{expected_msg}' not found in log"
else:
# Single instance should not trigger merging or warnings
assert len(caplog.records) == 0
# Config should be unchanged
assert updated_conf[CONF_TIME] == time_configs
finally:
fv.full_config.reset(token)
def test_sntp_inconsistent_manual_ids() -> None:
"""Test that inconsistent manual IDs raise an error."""
# Create configs with manual IDs that are inconsistent
time_configs = [
{
CONF_PLATFORM: CONF_SNTP,
CONF_ID: ID("sntp_time_1", is_manual=True),
CONF_SERVERS: ["192.168.1.1"],
},
{
CONF_PLATFORM: CONF_SNTP,
CONF_ID: ID("sntp_time_2", is_manual=True),
CONF_SERVERS: ["192.168.1.2"],
},
]
full_conf = {CONF_TIME: time_configs}
token = fv.full_config.set(full_conf)
try:
with pytest.raises(
cv.Invalid,
match="Found multiple SNTP configurations but id is inconsistent",
):
_sntp_final_validate({})
finally:
fv.full_config.reset(token)
def test_sntp_with_other_time_platforms(caplog: pytest.LogCaptureFixture) -> None:
"""Test that SNTP merging doesn't affect other time platforms."""
time_configs = [
{
CONF_PLATFORM: CONF_SNTP,
CONF_ID: ID("sntp_time_1", is_manual=False),
CONF_SERVERS: ["192.168.1.1"],
},
{
CONF_PLATFORM: "homeassistant",
CONF_ID: ID("homeassistant_time", is_manual=False),
},
{
CONF_PLATFORM: CONF_SNTP,
CONF_ID: ID("sntp_time_2", is_manual=False),
CONF_SERVERS: ["192.168.1.2"],
},
]
full_conf = {CONF_TIME: time_configs.copy()}
token = fv.full_config.set(full_conf)
try:
with caplog.at_level(logging.WARNING):
_sntp_final_validate({})
updated_conf = fv.full_config.get()
# Should have 2 time platforms: 1 merged SNTP + 1 homeassistant
assert len(updated_conf[CONF_TIME]) == 2
# Find the platforms
platforms = {tc[CONF_PLATFORM] for tc in updated_conf[CONF_TIME]}
assert platforms == {CONF_SNTP, "homeassistant"}
# Verify SNTP was merged
sntp_instances = [
tc for tc in updated_conf[CONF_TIME] if tc[CONF_PLATFORM] == CONF_SNTP
]
assert len(sntp_instances) == 1
assert sntp_instances[0][CONF_SERVERS] == ["192.168.1.1", "192.168.1.2"]
finally:
fv.full_config.reset(token)

View File

@@ -1,136 +0,0 @@
esphome:
name: template-alarm-many-sensors
friendly_name: "Template Alarm Control Panel with Many Sensors"
logger:
host:
api:
binary_sensor:
- platform: template
id: sensor1
name: "Door 1"
- platform: template
id: sensor2
name: "Door 2"
- platform: template
id: sensor3
name: "Window 1"
- platform: template
id: sensor4
name: "Window 2"
- platform: template
id: sensor5
name: "Motion 1"
- platform: template
id: sensor6
name: "Motion 2"
- platform: template
id: sensor7
name: "Glass Break 1"
- platform: template
id: sensor8
name: "Glass Break 2"
- platform: template
id: sensor9
name: "Smoke Detector"
- platform: template
id: sensor10
name: "CO Detector"
alarm_control_panel:
- platform: template
id: test_alarm
name: "Test Alarm"
codes:
- "1234"
requires_code_to_arm: true
arming_away_time: 5s
arming_home_time: 3s
arming_night_time: 3s
pending_time: 10s
trigger_time: 300s
restore_mode: ALWAYS_DISARMED
binary_sensors:
- input: sensor1
bypass_armed_home: false
bypass_armed_night: false
bypass_auto: true
chime: true
trigger_mode: DELAYED
- input: sensor2
bypass_armed_home: false
bypass_armed_night: false
bypass_auto: true
chime: true
trigger_mode: DELAYED
- input: sensor3
bypass_armed_home: true
bypass_armed_night: false
bypass_auto: false
chime: false
trigger_mode: DELAYED
- input: sensor4
bypass_armed_home: true
bypass_armed_night: false
bypass_auto: false
chime: false
trigger_mode: DELAYED
- input: sensor5
bypass_armed_home: false
bypass_armed_night: true
bypass_auto: false
chime: false
trigger_mode: INSTANT
- input: sensor6
bypass_armed_home: false
bypass_armed_night: true
bypass_auto: false
chime: false
trigger_mode: INSTANT
- input: sensor7
bypass_armed_home: false
bypass_armed_night: false
bypass_auto: false
chime: false
trigger_mode: INSTANT
- input: sensor8
bypass_armed_home: false
bypass_armed_night: false
bypass_auto: false
chime: false
trigger_mode: INSTANT
- input: sensor9
bypass_armed_home: false
bypass_armed_night: false
bypass_auto: false
chime: false
trigger_mode: INSTANT_ALWAYS
- input: sensor10
bypass_armed_home: false
bypass_armed_night: false
bypass_auto: false
chime: false
trigger_mode: INSTANT_ALWAYS
on_disarmed:
- logger.log: "Alarm disarmed"
on_arming:
- logger.log: "Alarm arming"
on_armed_away:
- logger.log: "Alarm armed away"
on_armed_home:
- logger.log: "Alarm armed home"
on_armed_night:
- logger.log: "Alarm armed night"
on_pending:
- logger.log: "Alarm pending"
on_triggered:
- logger.log: "Alarm triggered"
on_cleared:
- logger.log: "Alarm cleared"
on_chime:
- logger.log: "Chime activated"
on_ready:
- logger.log: "Sensors ready state changed"

View File

@@ -1,118 +0,0 @@
"""Integration test for template alarm control panel with many sensors."""
from __future__ import annotations
import aioesphomeapi
from aioesphomeapi.model import APIIntEnum
import pytest
from .state_utils import InitialStateHelper
from .types import APIClientConnectedFactory, RunCompiledFunction
class EspHomeACPFeatures(APIIntEnum):
"""ESPHome AlarmControlPanel feature numbers."""
ARM_HOME = 1
ARM_AWAY = 2
ARM_NIGHT = 4
TRIGGER = 8
ARM_CUSTOM_BYPASS = 16
ARM_VACATION = 32
@pytest.mark.asyncio
async def test_template_alarm_control_panel_many_sensors(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test template alarm control panel with 10 binary sensors using FixedVector."""
async with run_compiled(yaml_config), api_client_connected() as client:
# Get entity info first
entities, _ = await client.list_entities_services()
# Find the alarm control panel and binary sensors
alarm_info: aioesphomeapi.AlarmControlPanelInfo | None = None
binary_sensors: list[aioesphomeapi.BinarySensorInfo] = []
for entity in entities:
if isinstance(entity, aioesphomeapi.AlarmControlPanelInfo):
alarm_info = entity
elif isinstance(entity, aioesphomeapi.BinarySensorInfo):
binary_sensors.append(entity)
assert alarm_info is not None, "Alarm control panel entity info not found"
assert alarm_info.name == "Test Alarm"
assert alarm_info.requires_code is True
assert alarm_info.requires_code_to_arm is True
# Verify we have 10 binary sensors
assert len(binary_sensors) == 10, (
f"Expected 10 binary sensors, got {len(binary_sensors)}"
)
# Verify sensor names
expected_sensor_names = {
"Door 1",
"Door 2",
"Window 1",
"Window 2",
"Motion 1",
"Motion 2",
"Glass Break 1",
"Glass Break 2",
"Smoke Detector",
"CO Detector",
}
actual_sensor_names = {sensor.name for sensor in binary_sensors}
assert actual_sensor_names == expected_sensor_names, (
f"Sensor names mismatch. Expected: {expected_sensor_names}, "
f"Got: {actual_sensor_names}"
)
# Use InitialStateHelper to wait for all initial states
state_helper = InitialStateHelper(entities)
def on_state(state: aioesphomeapi.EntityState) -> None:
# We'll receive subsequent states here after initial states
pass
client.subscribe_states(state_helper.on_state_wrapper(on_state))
# Wait for all initial states
await state_helper.wait_for_initial_states(timeout=5.0)
# Verify the alarm state is disarmed initially
alarm_state = state_helper.initial_states.get(alarm_info.key)
assert alarm_state is not None, "Alarm control panel initial state not received"
assert isinstance(alarm_state, aioesphomeapi.AlarmControlPanelEntityState)
assert alarm_state.state == aioesphomeapi.AlarmControlPanelState.DISARMED, (
f"Expected initial state DISARMED, got {alarm_state.state}"
)
# Verify all 10 binary sensors have initial states
binary_sensor_states = [
state_helper.initial_states.get(sensor.key) for sensor in binary_sensors
]
assert all(state is not None for state in binary_sensor_states), (
"Not all binary sensors have initial states"
)
# Verify all binary sensor states are BinarySensorState type
for i, state in enumerate(binary_sensor_states):
assert isinstance(state, aioesphomeapi.BinarySensorState), (
f"Binary sensor {i} state is not BinarySensorState: {type(state)}"
)
# Verify supported features
expected_features = (
EspHomeACPFeatures.ARM_HOME
| EspHomeACPFeatures.ARM_AWAY
| EspHomeACPFeatures.ARM_NIGHT
| EspHomeACPFeatures.TRIGGER
)
assert alarm_info.supported_features == expected_features, (
f"Expected supported_features={expected_features} (ARM_HOME|ARM_AWAY|ARM_NIGHT|TRIGGER), "
f"got {alarm_info.supported_features}"
)