1
0
mirror of https://github.com/esphome/esphome.git synced 2025-09-19 03:32:20 +01:00
This commit is contained in:
J. Nick Koston
2025-06-22 13:48:27 +02:00
parent 17bf533ed7
commit 0764fa7292

View File

@@ -490,78 +490,85 @@ async def to_code(config: ConfigType) -> None:
if config[CONF_PLATFORMIO_OPTIONS]: if config[CONF_PLATFORMIO_OPTIONS]:
CORE.add_job(_add_platformio_options, config[CONF_PLATFORMIO_OPTIONS]) CORE.add_job(_add_platformio_options, config[CONF_PLATFORMIO_OPTIONS])
# Count total areas for reservation # Helper function to process an area configuration
total_areas = len(config[CONF_AREAS]) def process_area(
if config.get(CONF_AREA): area_conf: dict[str, str | core.ID],
total_areas += 1 area_hashes: dict[int, str],
area_ids: set[str],
# Reserve space for areas if any are defined conf_path: str | None = None,
if total_areas: ) -> None:
cg.add(cg.RawStatement(f"App.reserve_area({total_areas});")) """Process and register an area configuration."""
cg.add_define("USE_AREAS")
# Handle area configuration
area_hashes: dict[int, str] = {}
area_ids: set[str] = set()
device_hashes: dict[int, str] = {}
area_conf: dict[str, str | core.ID] | None
if area_conf := config.get(CONF_AREA):
# At this point, validation has already converted string to structured format
area_id: core.ID = area_conf[CONF_ID] area_id: core.ID = area_conf[CONF_ID]
area_id_str: str = area_id.id area_id_str: str = area_id.id
area_var = cg.new_Pvariable(area_id)
area_id_hash = fnv1a_32bit_hash(area_id_str) area_id_hash = fnv1a_32bit_hash(area_id_str)
area_name = area_conf[CONF_NAME] area_name: str = area_conf[CONF_NAME]
if conf_path: # Only verify collisions for areas from CONF_AREAS list
_verify_no_collisions(area_hashes, area_id, area_id_hash, conf_path)
area_hashes[area_id_hash] = area_name area_hashes[area_id_hash] = area_name
area_ids.add(area_id_str) area_ids.add(area_id_str)
area_var = cg.new_Pvariable(area_id)
cg.add(area_var.set_area_id(area_id_hash)) cg.add(area_var.set_area_id(area_id_hash))
cg.add(area_var.set_name(area_name)) cg.add(area_var.set_name(area_name))
cg.add(cg.App.register_area(area_var)) cg.add(cg.App.register_area(area_var))
# Process devices and areas # Initialize tracking structures
devices: list[dict[str, str | core.ID]] area_hashes: dict[int, str] = {}
if not (devices := config[CONF_DEVICES]): area_ids: set[str] = set()
device_hashes: dict[int, str] = {}
# Collect all areas to process
all_areas: list[tuple[dict[str, str | core.ID], str | None]] = []
# Add top-level area if present
if area_conf := config.get(CONF_AREA):
all_areas.append((area_conf, None))
# Add areas from CONF_AREAS list
all_areas.extend((area, CONF_AREAS) for area in config[CONF_AREAS])
# Reserve space for areas and process them
if all_areas:
cg.add(cg.RawStatement(f"App.reserve_area({len(all_areas)});"))
cg.add_define("USE_AREAS")
for area_conf, conf_path in all_areas:
process_area(area_conf, area_hashes, area_ids, conf_path)
# Process devices
devices: list[dict[str, str | core.ID]] = config[CONF_DEVICES]
if not devices:
return return
# Reserve space for devices # Reserve space for devices
cg.add(cg.RawStatement(f"App.reserve_device({len(devices)});")) cg.add(cg.RawStatement(f"App.reserve_device({len(devices)});"))
cg.add_define("USE_DEVICES") cg.add_define("USE_DEVICES")
# Process additional areas from the areas list # Process each device
areas: list[dict[str, str | core.ID]]
if areas := config[CONF_AREAS]:
for area_conf in areas:
area_id: core.ID = area_conf[CONF_ID]
area_ids.add(area_id.id)
area = cg.new_Pvariable(area_id)
area_id_hash = fnv1a_32bit_hash(area_id.id)
area_name: str = area_conf[CONF_NAME]
_verify_no_collisions(area_hashes, area_id, area_id_hash, CONF_AREAS)
area_hashes[area_id_hash] = area_name
cg.add(area.set_area_id(area_id_hash))
cg.add(area.set_name(area_name))
cg.add(cg.App.register_area(area))
# Process devices
for dev_conf in devices: for dev_conf in devices:
device_id = dev_conf[CONF_ID] device_id: core.ID = dev_conf[CONF_ID]
device_id_hash = fnv1a_32bit_hash(device_id.id) device_id_hash = fnv1a_32bit_hash(device_id.id)
device_name = dev_conf[CONF_NAME] device_name: str = dev_conf[CONF_NAME]
_verify_no_collisions(device_hashes, device_id, device_id_hash, CONF_DEVICES) _verify_no_collisions(device_hashes, device_id, device_id_hash, CONF_DEVICES)
device_hashes[device_id_hash] = device_name device_hashes[device_id_hash] = device_name
dev = cg.new_Pvariable(device_id) dev = cg.new_Pvariable(device_id)
cg.add(dev.set_device_id(device_id_hash)) cg.add(dev.set_device_id(device_id_hash))
cg.add(dev.set_name(device_name)) cg.add(dev.set_name(device_name))
# Set area if specified
if CONF_AREA_ID in dev_conf: if CONF_AREA_ID in dev_conf:
# Get the area variable and use its area_id area_id: core.ID = dev_conf[CONF_AREA_ID]
area_id = dev_conf[CONF_AREA_ID]
area_id_hash = fnv1a_32bit_hash(area_id.id)
if area_id.id not in area_ids: if area_id.id not in area_ids:
raise vol.Invalid( raise vol.Invalid(
f"Device '{device_name}' has an area_id '{area_id.id}'" f"Device '{device_name}' has an area_id '{area_id.id}'"
" that does not exist.", " that does not exist.",
path=[CONF_DEVICES, dev_conf[CONF_ID], CONF_AREA_ID], path=[CONF_DEVICES, device_id, CONF_AREA_ID],
) )
area_id_hash = fnv1a_32bit_hash(area_id.id)
cg.add(dev.set_area_id(area_id_hash)) cg.add(dev.set_area_id(area_id_hash))
cg.add(cg.App.register_device(dev)) cg.add(cg.App.register_device(dev))