mirror of
https://github.com/esphome/esphome.git
synced 2025-09-12 08:12:22 +01:00
validate sooner
This commit is contained in:
@@ -108,6 +108,50 @@ def validate_hostname(config):
|
||||
return config
|
||||
|
||||
|
||||
def validate_id_hash_collisions(config: dict) -> dict:
|
||||
"""Validate that there are no hash collisions between IDs of the same type."""
|
||||
from esphome.helpers import fnv1a_32bit_hash
|
||||
|
||||
# Check area hash collisions
|
||||
area_hashes: dict[int, str] = {}
|
||||
|
||||
# Check main area if present
|
||||
if CONF_AREA in config:
|
||||
area_id: core.ID = config[CONF_AREA][CONF_ID]
|
||||
if area_id.id:
|
||||
area_hash = fnv1a_32bit_hash(area_id.id)
|
||||
area_hashes[area_hash] = area_id.id
|
||||
|
||||
# Check areas list
|
||||
for area in config.get(CONF_AREAS, []):
|
||||
area_id: core.ID = area[CONF_ID]
|
||||
if area_id.id:
|
||||
area_hash = fnv1a_32bit_hash(area_id.id)
|
||||
if area_hash in area_hashes:
|
||||
raise cv.Invalid(
|
||||
f"Area ID '{area_id.id}' with hash {area_hash} collides with "
|
||||
f"existing area ID '{area_hashes[area_hash]}'",
|
||||
path=[CONF_AREAS, area_id.id],
|
||||
)
|
||||
area_hashes[area_hash] = area_id.id
|
||||
|
||||
# Check device hash collisions
|
||||
device_hashes: dict[int, str] = {}
|
||||
for device in config.get(CONF_DEVICES, []):
|
||||
device_id: core.ID = device[CONF_ID]
|
||||
if device_id.id:
|
||||
device_hash = fnv1a_32bit_hash(device_id.id)
|
||||
if device_hash in device_hashes:
|
||||
raise cv.Invalid(
|
||||
f"Device ID '{device_id.id}' with hash {device_hash} collides with "
|
||||
f"existing device ID '{device_hashes[device_hash]}'",
|
||||
path=[CONF_DEVICES, device_id.id],
|
||||
)
|
||||
device_hashes[device_hash] = device_id.id
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def valid_include(value):
|
||||
# Look for "<...>" includes
|
||||
if value.startswith("<") and value.endswith(">"):
|
||||
@@ -232,6 +276,7 @@ CONFIG_SCHEMA = cv.All(
|
||||
}
|
||||
),
|
||||
validate_hostname,
|
||||
validate_id_hash_collisions,
|
||||
)
|
||||
|
||||
PRELOAD_CONFIG_SCHEMA = cv.Schema(
|
||||
@@ -397,17 +442,6 @@ async def _add_platform_reserves() -> None:
|
||||
cg.add(cg.RawStatement(f"App.reserve_{platform_name}({count});"), prepend=True)
|
||||
|
||||
|
||||
def _verify_no_collisions(
|
||||
hashes: dict[int, str], id: str, id_hash: int, conf_key: str
|
||||
) -> None:
|
||||
"""Verify that the given id and name do not collide with existing ones."""
|
||||
if id_hash in hashes:
|
||||
raise vol.Invalid(
|
||||
f"ID '{id}' with hash {id_hash} collides with existing ID '{hashes[id_hash]}'",
|
||||
path=[conf_key],
|
||||
)
|
||||
|
||||
|
||||
@coroutine_with_priority(100.0)
|
||||
async def to_code(config: ConfigType) -> None:
|
||||
cg.add_global(cg.global_ns.namespace("esphome").using)
|
||||
@@ -493,9 +527,7 @@ async def to_code(config: ConfigType) -> None:
|
||||
# Helper function to process an area configuration
|
||||
def process_area(
|
||||
area_conf: dict[str, str | core.ID],
|
||||
area_hashes: dict[int, str],
|
||||
area_ids: set[str],
|
||||
conf_path: str | None = None,
|
||||
) -> None:
|
||||
"""Process and register an area configuration."""
|
||||
area_id: core.ID = area_conf[CONF_ID]
|
||||
@@ -503,10 +535,6 @@ async def to_code(config: ConfigType) -> None:
|
||||
area_id_hash = fnv1a_32bit_hash(area_id_str)
|
||||
area_name: str = area_conf[CONF_NAME]
|
||||
|
||||
if conf_path: # Only verify collisions for areas from CONF_AREAS list
|
||||
_verify_no_collisions(area_hashes, area_id, area_id_hash, conf_path)
|
||||
|
||||
area_hashes[area_id_hash] = area_name
|
||||
area_ids.add(area_id_str)
|
||||
|
||||
area_var = cg.new_Pvariable(area_id)
|
||||
@@ -515,27 +543,25 @@ async def to_code(config: ConfigType) -> None:
|
||||
cg.add(cg.App.register_area(area_var))
|
||||
|
||||
# Initialize tracking structures
|
||||
area_hashes: dict[int, str] = {}
|
||||
area_ids: set[str] = set()
|
||||
device_hashes: dict[int, str] = {}
|
||||
|
||||
# Collect all areas to process
|
||||
all_areas: list[tuple[dict[str, str | core.ID], str | None]] = []
|
||||
all_areas: list[dict[str, str | core.ID]] = []
|
||||
|
||||
# Add top-level area if present
|
||||
if area_conf := config.get(CONF_AREA):
|
||||
all_areas.append((area_conf, None))
|
||||
all_areas.append(area_conf)
|
||||
|
||||
# Add areas from CONF_AREAS list
|
||||
all_areas.extend((area, CONF_AREAS) for area in config[CONF_AREAS])
|
||||
all_areas.extend(config[CONF_AREAS])
|
||||
|
||||
# Reserve space for areas and process them
|
||||
if all_areas:
|
||||
cg.add(cg.RawStatement(f"App.reserve_area({len(all_areas)});"))
|
||||
cg.add_define("USE_AREAS")
|
||||
|
||||
for area_conf, conf_path in all_areas:
|
||||
process_area(area_conf, area_hashes, area_ids, conf_path)
|
||||
for area_conf in all_areas:
|
||||
process_area(area_conf, area_ids)
|
||||
|
||||
# Process devices
|
||||
devices: list[dict[str, str | core.ID]] = config[CONF_DEVICES]
|
||||
@@ -552,9 +578,6 @@ async def to_code(config: ConfigType) -> None:
|
||||
device_id_hash = fnv1a_32bit_hash(device_id.id)
|
||||
device_name: str = dev_conf[CONF_NAME]
|
||||
|
||||
_verify_no_collisions(device_hashes, device_id, device_id_hash, CONF_DEVICES)
|
||||
device_hashes[device_id_hash] = device_name
|
||||
|
||||
dev = cg.new_Pvariable(device_id)
|
||||
cg.add(dev.set_device_id(device_id_hash))
|
||||
cg.add(dev.set_name(device_name))
|
||||
|
@@ -172,7 +172,11 @@ def test_area_id_collision(
|
||||
|
||||
# Check for the specific error message in stdout
|
||||
captured = capsys.readouterr()
|
||||
assert "ID duplicate_id redefined! Check esphome->area->id." in captured.out
|
||||
# Since duplicate IDs have the same hash, our hash collision detection catches this
|
||||
assert (
|
||||
"Area ID 'duplicate_id' with hash 1805131238 collides with existing area ID 'duplicate_id'"
|
||||
in captured.out
|
||||
)
|
||||
|
||||
|
||||
def test_device_without_area(yaml_file: Callable[[str], str]) -> None:
|
||||
@@ -193,3 +197,47 @@ def test_device_without_area(yaml_file: Callable[[str], str]) -> None:
|
||||
|
||||
# Verify no area_id is present
|
||||
assert "area_id" not in device
|
||||
|
||||
|
||||
def test_device_with_invalid_area_id(
|
||||
yaml_file: Callable[[str], str], capsys: pytest.CaptureFixture[str]
|
||||
) -> None:
|
||||
"""Test that device with non-existent area_id fails validation."""
|
||||
result = load_config_from_fixture(yaml_file, "device_invalid_area.yaml")
|
||||
assert result is None
|
||||
|
||||
# Check for the specific error message in stdout
|
||||
captured = capsys.readouterr()
|
||||
assert "Couldn't find ID 'nonexistent_area'" in captured.out
|
||||
|
||||
|
||||
def test_device_id_hash_collision(
|
||||
yaml_file: Callable[[str], str], capsys: pytest.CaptureFixture[str]
|
||||
) -> None:
|
||||
"""Test that device IDs with hash collisions are detected."""
|
||||
result = load_config_from_fixture(yaml_file, "device_id_collision.yaml")
|
||||
assert result is None
|
||||
|
||||
# Check for the specific error message about hash collision
|
||||
captured = capsys.readouterr()
|
||||
# The error message shows the ID that collides and includes the hash value
|
||||
assert (
|
||||
"Device ID 'd6ka' with hash 3082558663 collides with existing device ID 'test_2258'"
|
||||
in captured.out
|
||||
)
|
||||
|
||||
|
||||
def test_area_id_hash_collision(
|
||||
yaml_file: Callable[[str], str], capsys: pytest.CaptureFixture[str]
|
||||
) -> None:
|
||||
"""Test that area IDs with hash collisions are detected."""
|
||||
result = load_config_from_fixture(yaml_file, "area_id_hash_collision.yaml")
|
||||
assert result is None
|
||||
|
||||
# Check for the specific error message about hash collision
|
||||
captured = capsys.readouterr()
|
||||
# The error message shows the ID that collides and includes the hash value
|
||||
assert (
|
||||
"Area ID 'd6ka' with hash 3082558663 collides with existing area ID 'test_2258'"
|
||||
in captured.out
|
||||
)
|
||||
|
Reference in New Issue
Block a user