1
0
mirror of https://github.com/esphome/esphome.git synced 2025-11-18 15:55:46 +00:00

[web_server.ota] Merge multiple instances to prevent undefined behavior

This commit is contained in:
J. Nick Koston
2025-11-14 08:59:05 -06:00
parent e49a943cf7
commit 5f10fbc4f6
2 changed files with 178 additions and 1 deletions

View File

@@ -1,17 +1,69 @@
import logging
import esphome.codegen as cg
from esphome.components.esp32 import add_idf_component
from esphome.components.ota import BASE_OTA_SCHEMA, OTAComponent, ota_to_code
from esphome.config_helpers import merge_config
import esphome.config_validation as cv
from esphome.const import CONF_ID
from esphome.const import CONF_ID, CONF_OTA, CONF_PLATFORM
from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
import esphome.final_validate as fv
from esphome.types import ConfigType
_LOGGER = logging.getLogger(__name__)
CODEOWNERS = ["@esphome/core"]
DEPENDENCIES = ["network", "web_server_base"]
CONF_WEB_SERVER = "web_server"
web_server_ns = cg.esphome_ns.namespace("web_server")
WebServerOTAComponent = web_server_ns.class_("WebServerOTAComponent", OTAComponent)
def _web_server_ota_final_validate(config: ConfigType) -> None:
"""Merge multiple web_server OTA instances into one.
Multiple web_server OTA instances register duplicate HTTP handlers for /update,
causing undefined behavior. Merge them into a single instance.
"""
full_conf = fv.full_config.get()
ota_confs = full_conf.get(CONF_OTA, [])
web_server_ota_configs: list[ConfigType] = []
other_ota_configs: list[ConfigType] = []
for ota_conf in ota_confs:
if ota_conf.get(CONF_PLATFORM) == CONF_WEB_SERVER:
web_server_ota_configs.append(ota_conf)
else:
other_ota_configs.append(ota_conf)
if len(web_server_ota_configs) <= 1:
return
# Merge all web_server OTA configs into the first one
merged = web_server_ota_configs[0]
for ota_conf in web_server_ota_configs[1:]:
# Validate that IDs are consistent if manually specified
if merged[CONF_ID].is_manual and ota_conf[CONF_ID].is_manual:
raise cv.Invalid(
f"Found multiple web_server OTA configurations but {CONF_ID} is inconsistent"
)
merged = merge_config(merged, ota_conf)
_LOGGER.warning(
"Found and merged %d web_server OTA configurations into one instance",
len(web_server_ota_configs),
)
# Replace OTA configs with merged web_server + other OTA platforms
other_ota_configs.append(merged)
full_conf[CONF_OTA] = other_ota_configs
fv.full_config.set(full_conf)
CONFIG_SCHEMA = (
cv.Schema(
{
@@ -22,6 +74,8 @@ CONFIG_SCHEMA = (
.extend(cv.COMPONENT_SCHEMA)
)
FINAL_VALIDATE_SCHEMA = _web_server_ota_final_validate
@coroutine_with_priority(CoroPriority.WEB_SERVER_OTA)
async def to_code(config):

View File

@@ -1,6 +1,21 @@
"""Tests for the web_server OTA platform."""
from __future__ import annotations
from collections.abc import Callable
import logging
from typing import Any
import pytest
from esphome import config_validation as cv
from esphome.components.web_server.ota import (
CONF_WEB_SERVER,
_web_server_ota_final_validate,
)
from esphome.const import CONF_ID, CONF_OTA, CONF_PLATFORM
from esphome.core import ID
import esphome.final_validate as fv
def test_web_server_ota_generated(generate_main: Callable[[str], str]) -> None:
@@ -100,3 +115,111 @@ def test_web_server_ota_esp8266(generate_main: Callable[[str], str]) -> None:
# Check web server OTA component is present
assert "WebServerOTAComponent" in main_cpp
assert "web_server::WebServerOTAComponent" in main_cpp
@pytest.mark.parametrize(
("ota_configs", "expected_count", "warning_expected"),
[
pytest.param(
[
{
CONF_PLATFORM: CONF_WEB_SERVER,
CONF_ID: ID("ota_web", is_manual=False),
}
],
1,
False,
id="single_instance_no_merge",
),
pytest.param(
[
{
CONF_PLATFORM: CONF_WEB_SERVER,
CONF_ID: ID("ota_web_1", is_manual=False),
},
{
CONF_PLATFORM: CONF_WEB_SERVER,
CONF_ID: ID("ota_web_2", is_manual=False),
},
],
1,
True,
id="two_instances_merged",
),
pytest.param(
[
{
CONF_PLATFORM: CONF_WEB_SERVER,
CONF_ID: ID("ota_web_1", is_manual=False),
},
{
CONF_PLATFORM: "esphome",
CONF_ID: ID("ota_esphome", is_manual=False),
},
{
CONF_PLATFORM: CONF_WEB_SERVER,
CONF_ID: ID("ota_web_2", is_manual=False),
},
],
2,
True,
id="mixed_platforms_web_server_merged",
),
],
)
def test_web_server_ota_instance_merging(
ota_configs: list[dict[str, Any]],
expected_count: int,
warning_expected: bool,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test web_server OTA instance merging behavior."""
full_conf = {CONF_OTA: ota_configs.copy()}
token = fv.full_config.set(full_conf)
try:
with caplog.at_level(logging.WARNING):
_web_server_ota_final_validate({})
updated_conf = fv.full_config.get()
# Verify total number of OTA platforms
assert len(updated_conf[CONF_OTA]) == expected_count
# Verify warning
if warning_expected:
assert any(
"Found and merged" in record.message
and "web_server OTA" in record.message
for record in caplog.records
), "Expected merge warning not found in log"
else:
assert len(caplog.records) == 0, "Unexpected warnings logged"
finally:
fv.full_config.reset(token)
def test_web_server_ota_inconsistent_manual_ids() -> None:
"""Test that inconsistent manual IDs raise an error."""
ota_configs = [
{
CONF_PLATFORM: CONF_WEB_SERVER,
CONF_ID: ID("ota_web_1", is_manual=True),
},
{
CONF_PLATFORM: CONF_WEB_SERVER,
CONF_ID: ID("ota_web_2", is_manual=True),
},
]
full_conf = {CONF_OTA: ota_configs}
token = fv.full_config.set(full_conf)
try:
with pytest.raises(
cv.Invalid,
match="Found multiple web_server OTA configurations but id is inconsistent",
):
_web_server_ota_final_validate({})
finally:
fv.full_config.reset(token)