1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-30 14:43:51 +00:00

Merge branch 'min_filter_ring_buffer' into integration

This commit is contained in:
J. Nick Koston
2025-10-15 22:13:26 -10:00
105 changed files with 3136 additions and 688 deletions

View File

@@ -96,6 +96,13 @@ def mock_run_git_command() -> Generator[Mock, None, None]:
yield mock
@pytest.fixture
def mock_subprocess_run() -> Generator[Mock, None, None]:
"""Mock subprocess.run for testing."""
with patch("subprocess.run") as mock:
yield mock
@pytest.fixture
def mock_get_idedata() -> Generator[Mock, None, None]:
"""Mock get_idedata for platformio_api."""

View File

@@ -1,16 +1,204 @@
"""Tests for git.py module."""
from datetime import datetime, timedelta
import hashlib
import os
from pathlib import Path
from typing import Any
from unittest.mock import Mock
import pytest
from esphome import git
import esphome.config_validation as cv
from esphome.core import CORE, TimePeriodSeconds
from esphome.git import GitCommandError
def _compute_repo_dir(url: str, ref: str | None, domain: str) -> Path:
"""Helper to compute the expected repo directory path using git module's logic."""
key = f"{url}@{ref}"
return git._compute_destination_path(key, domain)
def _setup_old_repo(repo_dir: Path, days_old: int = 2) -> None:
"""Helper to set up a git repo directory structure with an old timestamp.
Args:
repo_dir: The repository directory path to create.
days_old: Number of days old to make the FETCH_HEAD file (default: 2).
"""
# Create repo directory
repo_dir.mkdir(parents=True)
git_dir = repo_dir / ".git"
git_dir.mkdir()
# Create FETCH_HEAD file with old timestamp
fetch_head = git_dir / "FETCH_HEAD"
fetch_head.write_text("test")
old_time = datetime.now() - timedelta(days=days_old)
fetch_head.touch()
os.utime(fetch_head, (old_time.timestamp(), old_time.timestamp()))
def _get_git_command_type(cmd: list[str]) -> str | None:
"""Helper to determine the type of git command from a command list.
Args:
cmd: The git command list (e.g., ["git", "rev-parse", "HEAD"]).
Returns:
The command type ("rev-parse", "stash", "fetch", "reset", "clone") or None.
"""
# Git commands are always in format ["git", "command", ...], so check index 1
if len(cmd) > 1:
return cmd[1]
return None
def test_run_git_command_success(tmp_path: Path) -> None:
"""Test that run_git_command returns output on success."""
# Create a simple git repo to test with
repo_dir = tmp_path / "test_repo"
repo_dir.mkdir()
# Initialize a git repo
result = git.run_git_command(["git", "init"], str(repo_dir))
assert "Initialized empty Git repository" in result or result == ""
# Verify we can run a command and get output
result = git.run_git_command(["git", "status", "--porcelain"], str(repo_dir))
# Empty repo should have empty status
assert isinstance(result, str)
def test_run_git_command_with_git_dir_isolation(
tmp_path: Path, mock_subprocess_run: Mock
) -> None:
"""Test that git_dir parameter properly isolates git operations."""
repo_dir = tmp_path / "test_repo"
repo_dir.mkdir()
git_dir = repo_dir / ".git"
git_dir.mkdir()
# Configure mock to return success
mock_subprocess_run.return_value = Mock(
returncode=0,
stdout=b"test output",
stderr=b"",
)
result = git.run_git_command(
["git", "rev-parse", "HEAD"],
git_dir=repo_dir,
)
# Verify subprocess.run was called
assert mock_subprocess_run.called
call_args = mock_subprocess_run.call_args
# Verify environment was set
env = call_args[1]["env"]
assert "GIT_DIR" in env
assert "GIT_WORK_TREE" in env
assert env["GIT_DIR"] == str(repo_dir / ".git")
assert env["GIT_WORK_TREE"] == str(repo_dir)
assert result == "test output"
def test_run_git_command_raises_git_not_installed_error(
tmp_path: Path, mock_subprocess_run: Mock
) -> None:
"""Test that FileNotFoundError is converted to GitNotInstalledError."""
from esphome.git import GitNotInstalledError
repo_dir = tmp_path / "test_repo"
# Configure mock to raise FileNotFoundError
mock_subprocess_run.side_effect = FileNotFoundError("git not found")
with pytest.raises(GitNotInstalledError, match="git is not installed"):
git.run_git_command(["git", "status"], git_dir=repo_dir)
def test_run_git_command_raises_git_command_error_on_failure(
tmp_path: Path, mock_subprocess_run: Mock
) -> None:
"""Test that failed git commands raise GitCommandError."""
repo_dir = tmp_path / "test_repo"
# Configure mock to return non-zero exit code
mock_subprocess_run.return_value = Mock(
returncode=1,
stdout=b"",
stderr=b"fatal: not a git repository",
)
with pytest.raises(GitCommandError, match="not a git repository"):
git.run_git_command(["git", "status"], git_dir=repo_dir)
def test_run_git_command_strips_fatal_prefix(
tmp_path: Path, mock_subprocess_run: Mock
) -> None:
"""Test that 'fatal: ' prefix is stripped from error messages."""
repo_dir = tmp_path / "test_repo"
# Configure mock to return error with "fatal: " prefix
mock_subprocess_run.return_value = Mock(
returncode=128,
stdout=b"",
stderr=b"fatal: repository not found\n",
)
with pytest.raises(GitCommandError) as exc_info:
git.run_git_command(["git", "clone", "invalid-url"], git_dir=repo_dir)
# Error message should NOT include "fatal: " prefix
assert "fatal:" not in str(exc_info.value)
assert "repository not found" in str(exc_info.value)
def test_run_git_command_without_git_dir(mock_subprocess_run: Mock) -> None:
"""Test that run_git_command works without git_dir (clone case)."""
# Configure mock to return success
mock_subprocess_run.return_value = Mock(
returncode=0,
stdout=b"Cloning into 'test_repo'...",
stderr=b"",
)
result = git.run_git_command(["git", "clone", "https://github.com/test/repo"])
# Verify subprocess.run was called
assert mock_subprocess_run.called
call_args = mock_subprocess_run.call_args
# Verify environment does NOT have GIT_DIR or GIT_WORK_TREE set
# (it should use the default environment or None)
env = call_args[1].get("env")
if env is not None:
assert "GIT_DIR" not in env
assert "GIT_WORK_TREE" not in env
# Verify cwd is None (default)
assert call_args[1].get("cwd") is None
assert result == "Cloning into 'test_repo'..."
def test_run_git_command_without_git_dir_raises_error(
mock_subprocess_run: Mock,
) -> None:
"""Test that run_git_command without git_dir can still raise errors."""
# Configure mock to return error
mock_subprocess_run.return_value = Mock(
returncode=128,
stdout=b"",
stderr=b"fatal: repository not found\n",
)
with pytest.raises(GitCommandError, match="repository not found"):
git.run_git_command(["git", "clone", "https://invalid.url/repo.git"])
def test_clone_or_update_with_never_refresh(
@@ -20,16 +208,10 @@ def test_clone_or_update_with_never_refresh(
# Set up CORE.config_path so data_dir uses tmp_path
CORE.config_path = tmp_path / "test.yaml"
# Compute the expected repo directory path
url = "https://github.com/test/repo"
ref = None
key = f"{url}@{ref}"
domain = "test"
# Compute hash-based directory name (matching _compute_destination_path logic)
h = hashlib.new("sha256")
h.update(key.encode())
repo_dir = tmp_path / ".esphome" / domain / h.hexdigest()[:8]
repo_dir = _compute_repo_dir(url, ref, domain)
# Create the git repo directory structure
repo_dir.mkdir(parents=True)
@@ -61,16 +243,10 @@ def test_clone_or_update_with_refresh_updates_old_repo(
# Set up CORE.config_path so data_dir uses tmp_path
CORE.config_path = tmp_path / "test.yaml"
# Compute the expected repo directory path
url = "https://github.com/test/repo"
ref = None
key = f"{url}@{ref}"
domain = "test"
# Compute hash-based directory name (matching _compute_destination_path logic)
h = hashlib.new("sha256")
h.update(key.encode())
repo_dir = tmp_path / ".esphome" / domain / h.hexdigest()[:8]
repo_dir = _compute_repo_dir(url, ref, domain)
# Create the git repo directory structure
repo_dir.mkdir(parents=True)
@@ -115,16 +291,10 @@ def test_clone_or_update_with_refresh_skips_fresh_repo(
# Set up CORE.config_path so data_dir uses tmp_path
CORE.config_path = tmp_path / "test.yaml"
# Compute the expected repo directory path
url = "https://github.com/test/repo"
ref = None
key = f"{url}@{ref}"
domain = "test"
# Compute hash-based directory name (matching _compute_destination_path logic)
h = hashlib.new("sha256")
h.update(key.encode())
repo_dir = tmp_path / ".esphome" / domain / h.hexdigest()[:8]
repo_dir = _compute_repo_dir(url, ref, domain)
# Create the git repo directory structure
repo_dir.mkdir(parents=True)
@@ -161,16 +331,10 @@ def test_clone_or_update_clones_missing_repo(
# Set up CORE.config_path so data_dir uses tmp_path
CORE.config_path = tmp_path / "test.yaml"
# Compute the expected repo directory path
url = "https://github.com/test/repo"
ref = None
key = f"{url}@{ref}"
domain = "test"
# Compute hash-based directory name (matching _compute_destination_path logic)
h = hashlib.new("sha256")
h.update(key.encode())
repo_dir = tmp_path / ".esphome" / domain / h.hexdigest()[:8]
repo_dir = _compute_repo_dir(url, ref, domain)
# Create base directory but NOT the repo itself
base_dir = tmp_path / ".esphome" / domain
@@ -203,16 +367,10 @@ def test_clone_or_update_with_none_refresh_always_updates(
# Set up CORE.config_path so data_dir uses tmp_path
CORE.config_path = tmp_path / "test.yaml"
# Compute the expected repo directory path
url = "https://github.com/test/repo"
ref = None
key = f"{url}@{ref}"
domain = "test"
# Compute hash-based directory name (matching _compute_destination_path logic)
h = hashlib.new("sha256")
h.update(key.encode())
repo_dir = tmp_path / ".esphome" / domain / h.hexdigest()[:8]
repo_dir = _compute_repo_dir(url, ref, domain)
# Create the git repo directory structure
repo_dir.mkdir(parents=True)
@@ -273,40 +431,20 @@ def test_clone_or_update_recovers_from_git_failures(
url = "https://github.com/test/repo"
ref = "main"
key = f"{url}@{ref}"
domain = "test"
repo_dir = _compute_repo_dir(url, ref, domain)
h = hashlib.new("sha256")
h.update(key.encode())
repo_dir = tmp_path / ".esphome" / domain / h.hexdigest()[:8]
# Create repo directory
repo_dir.mkdir(parents=True)
git_dir = repo_dir / ".git"
git_dir.mkdir()
fetch_head = git_dir / "FETCH_HEAD"
fetch_head.write_text("test")
old_time = datetime.now() - timedelta(days=2)
fetch_head.touch()
os.utime(fetch_head, (old_time.timestamp(), old_time.timestamp()))
# Use helper to set up old repo
_setup_old_repo(repo_dir)
# Track command call counts to make first call fail, subsequent calls succeed
call_counts: dict[str, int] = {}
def git_command_side_effect(cmd: list[str], cwd: str | None = None) -> str:
def git_command_side_effect(
cmd: list[str], cwd: str | None = None, **kwargs: Any
) -> str:
# Determine which command this is
cmd_type = None
if "rev-parse" in cmd:
cmd_type = "rev-parse"
elif "stash" in cmd:
cmd_type = "stash"
elif "fetch" in cmd:
cmd_type = "fetch"
elif "reset" in cmd:
cmd_type = "reset"
elif "clone" in cmd:
cmd_type = "clone"
cmd_type = _get_git_command_type(cmd)
# Track call count for this command type
if cmd_type:
@@ -314,7 +452,7 @@ def test_clone_or_update_recovers_from_git_failures(
# Fail on first call to the specified command, succeed on subsequent calls
if cmd_type == fail_command and call_counts[cmd_type] == 1:
raise cv.Invalid(error_message)
raise GitCommandError(error_message)
# Default successful responses
if cmd_type == "rev-parse":
@@ -353,34 +491,25 @@ def test_clone_or_update_fails_when_recovery_also_fails(
url = "https://github.com/test/repo"
ref = "main"
key = f"{url}@{ref}"
domain = "test"
repo_dir = _compute_repo_dir(url, ref, domain)
h = hashlib.new("sha256")
h.update(key.encode())
repo_dir = tmp_path / ".esphome" / domain / h.hexdigest()[:8]
# Create repo directory
repo_dir.mkdir(parents=True)
git_dir = repo_dir / ".git"
git_dir.mkdir()
fetch_head = git_dir / "FETCH_HEAD"
fetch_head.write_text("test")
old_time = datetime.now() - timedelta(days=2)
fetch_head.touch()
os.utime(fetch_head, (old_time.timestamp(), old_time.timestamp()))
# Use helper to set up old repo
_setup_old_repo(repo_dir)
# Mock git command to fail on clone (simulating network failure during recovery)
def git_command_side_effect(cmd: list[str], cwd: str | None = None) -> str:
if "rev-parse" in cmd:
def git_command_side_effect(
cmd: list[str], cwd: str | None = None, **kwargs: Any
) -> str:
cmd_type = _get_git_command_type(cmd)
if cmd_type == "rev-parse":
# First time fails (broken repo)
raise cv.Invalid(
raise GitCommandError(
"ambiguous argument 'HEAD': unknown revision or path not in the working tree."
)
if "clone" in cmd:
if cmd_type == "clone":
# Clone also fails (recovery fails)
raise cv.Invalid("fatal: unable to access repository")
raise GitCommandError("fatal: unable to access repository")
return ""
mock_run_git_command.side_effect = git_command_side_effect
@@ -388,7 +517,7 @@ def test_clone_or_update_fails_when_recovery_also_fails(
refresh = TimePeriodSeconds(days=1)
# Should raise after one recovery attempt fails
with pytest.raises(cv.Invalid, match="fatal: unable to access repository"):
with pytest.raises(GitCommandError, match="fatal: unable to access repository"):
git.clone_or_update(
url=url,
ref=ref,
@@ -404,3 +533,141 @@ def test_clone_or_update_fails_when_recovery_also_fails(
# Should have tried rev-parse once (which failed and triggered recovery)
rev_parse_calls = [c for c in call_list if "rev-parse" in c[0][0]]
assert len(rev_parse_calls) == 1
def test_clone_or_update_recover_broken_flag_prevents_second_recovery(
tmp_path: Path, mock_run_git_command: Mock
) -> None:
"""Test that _recover_broken=False prevents a second recovery attempt (tests the raise path)."""
# Set up CORE.config_path so data_dir uses tmp_path
CORE.config_path = tmp_path / "test.yaml"
url = "https://github.com/test/repo"
ref = "main"
domain = "test"
repo_dir = _compute_repo_dir(url, ref, domain)
# Use helper to set up old repo
_setup_old_repo(repo_dir)
# Track fetch calls to differentiate between first (in clone) and second (in recovery update)
call_counts: dict[str, int] = {}
# Mock git command to fail on fetch during recovery's ref checkout
def git_command_side_effect(
cmd: list[str], cwd: str | None = None, **kwargs: Any
) -> str:
cmd_type = _get_git_command_type(cmd)
if cmd_type:
call_counts[cmd_type] = call_counts.get(cmd_type, 0) + 1
# First attempt: rev-parse fails (broken repo)
if cmd_type == "rev-parse" and call_counts[cmd_type] == 1:
raise GitCommandError(
"ambiguous argument 'HEAD': unknown revision or path not in the working tree."
)
# Recovery: clone succeeds
if cmd_type == "clone":
return ""
# Recovery: fetch for ref checkout fails
# This happens in the clone path when ref is not None (line 80 in git.py)
if cmd_type == "fetch" and call_counts[cmd_type] == 1:
raise GitCommandError("fatal: couldn't find remote ref main")
# Default success
return "abc123" if cmd_type == "rev-parse" else ""
mock_run_git_command.side_effect = git_command_side_effect
refresh = TimePeriodSeconds(days=1)
# Should raise on the fetch during recovery (when _recover_broken=False)
# This tests the critical "if not _recover_broken: raise" path
with pytest.raises(GitCommandError, match="fatal: couldn't find remote ref main"):
git.clone_or_update(
url=url,
ref=ref,
refresh=refresh,
domain=domain,
)
# Verify the sequence of events
call_list = mock_run_git_command.call_args_list
# Should have: rev-parse (fail, triggers recovery), clone (success),
# fetch (fail during ref checkout, raises because _recover_broken=False)
rev_parse_calls = [c for c in call_list if "rev-parse" in c[0][0]]
# Should have exactly one rev-parse call that failed
assert len(rev_parse_calls) == 1
clone_calls = [c for c in call_list if "clone" in c[0][0]]
# Should have exactly one clone call (the recovery attempt)
assert len(clone_calls) == 1
fetch_calls = [c for c in call_list if "fetch" in c[0][0]]
# Should have exactly one fetch call that failed (during ref checkout in recovery)
assert len(fetch_calls) == 1
def test_clone_or_update_recover_broken_flag_prevents_infinite_loop(
tmp_path: Path, mock_run_git_command: Mock
) -> None:
"""Test that _recover_broken=False prevents infinite recursion when repo persists."""
# This tests the critical "if not _recover_broken: raise" path at line 124-125
# Set up CORE.config_path so data_dir uses tmp_path
CORE.config_path = tmp_path / "test.yaml"
url = "https://github.com/test/repo"
ref = "main"
domain = "test"
repo_dir = _compute_repo_dir(url, ref, domain)
# Use helper to set up old repo
_setup_old_repo(repo_dir)
# Mock shutil.rmtree to NOT actually delete the directory
# This simulates a scenario where deletion fails (permissions, etc.)
import unittest.mock
def mock_rmtree(path, *args, **kwargs):
# Don't actually delete - this causes the recursive call to still see the repo
pass
# Mock git commands to always fail on stash
def git_command_side_effect(
cmd: list[str], cwd: str | None = None, **kwargs: Any
) -> str:
cmd_type = _get_git_command_type(cmd)
if cmd_type == "rev-parse":
return "abc123"
if cmd_type == "stash":
# Always fails
raise GitCommandError("fatal: unable to write new index file")
return ""
mock_run_git_command.side_effect = git_command_side_effect
refresh = TimePeriodSeconds(days=1)
# Mock shutil.rmtree and test
# Should raise on the second attempt when _recover_broken=False
# This hits the "if not _recover_broken: raise" path
with (
unittest.mock.patch("esphome.git.shutil.rmtree", side_effect=mock_rmtree),
pytest.raises(GitCommandError, match="fatal: unable to write new index file"),
):
git.clone_or_update(
url=url,
ref=ref,
refresh=refresh,
domain=domain,
)
# Verify the sequence: stash fails twice (once triggering recovery, once raising)
call_list = mock_run_git_command.call_args_list
stash_calls = [c for c in call_list if "stash" in c[0][0]]
# Should have exactly two stash calls
assert len(stash_calls) == 2

View File

@@ -2,9 +2,12 @@ import glob
import logging
from pathlib import Path
from esphome import yaml_util
from esphome import config as config_module, yaml_util
from esphome.components import substitutions
from esphome.const import CONF_PACKAGES
from esphome.config_helpers import merge_config
from esphome.const import CONF_PACKAGES, CONF_SUBSTITUTIONS
from esphome.core import CORE
from esphome.util import OrderedDict
_LOGGER = logging.getLogger(__name__)
@@ -118,3 +121,200 @@ def test_substitutions_fixtures(fixture_path):
if DEV_MODE:
_LOGGER.error("Tests passed, but Dev mode is enabled.")
assert not DEV_MODE # make sure DEV_MODE is disabled after you are finished.
def test_substitutions_with_command_line_maintains_ordered_dict() -> None:
"""Test that substitutions remain an OrderedDict when command line substitutions are provided,
and that move_to_end() can be called successfully.
This is a regression test for https://github.com/esphome/esphome/issues/11182
where the config would become a regular dict and fail when move_to_end() was called.
"""
# Create an OrderedDict config with substitutions
config = OrderedDict()
config["esphome"] = {"name": "test"}
config[CONF_SUBSTITUTIONS] = {"var1": "value1", "var2": "value2"}
config["other_key"] = "other_value"
# Command line substitutions that should override
command_line_subs = {"var2": "override", "var3": "new_value"}
# Call do_substitution_pass with command line substitutions
substitutions.do_substitution_pass(config, command_line_subs)
# Verify that config is still an OrderedDict
assert isinstance(config, OrderedDict), "Config should remain an OrderedDict"
# Verify substitutions are at the beginning (move_to_end with last=False)
keys = list(config.keys())
assert keys[0] == CONF_SUBSTITUTIONS, "Substitutions should be first key"
# Verify substitutions were properly merged
assert config[CONF_SUBSTITUTIONS]["var1"] == "value1"
assert config[CONF_SUBSTITUTIONS]["var2"] == "override"
assert config[CONF_SUBSTITUTIONS]["var3"] == "new_value"
# Verify config[CONF_SUBSTITUTIONS] is also an OrderedDict
assert isinstance(config[CONF_SUBSTITUTIONS], OrderedDict), (
"Substitutions should be an OrderedDict"
)
def test_substitutions_without_command_line_maintains_ordered_dict() -> None:
"""Test that substitutions work correctly without command line substitutions."""
config = OrderedDict()
config["esphome"] = {"name": "test"}
config[CONF_SUBSTITUTIONS] = {"var1": "value1"}
config["other_key"] = "other_value"
# Call without command line substitutions
substitutions.do_substitution_pass(config, None)
# Verify that config is still an OrderedDict
assert isinstance(config, OrderedDict), "Config should remain an OrderedDict"
# Verify substitutions are at the beginning
keys = list(config.keys())
assert keys[0] == CONF_SUBSTITUTIONS, "Substitutions should be first key"
def test_substitutions_after_merge_config_maintains_ordered_dict() -> None:
"""Test that substitutions work after merge_config (packages scenario).
This is a regression test for https://github.com/esphome/esphome/issues/11182
where using packages would cause config to become a regular dict, breaking move_to_end().
"""
# Simulate what happens with packages - merge two OrderedDict configs
base_config = OrderedDict()
base_config["esphome"] = {"name": "base"}
base_config[CONF_SUBSTITUTIONS] = {"var1": "value1"}
package_config = OrderedDict()
package_config["sensor"] = [{"platform": "template"}]
package_config[CONF_SUBSTITUTIONS] = {"var2": "value2"}
# Merge configs (simulating package merge)
merged_config = merge_config(base_config, package_config)
# Verify merged config is still an OrderedDict
assert isinstance(merged_config, OrderedDict), (
"Merged config should be an OrderedDict"
)
# Now try to run substitution pass on the merged config
substitutions.do_substitution_pass(merged_config, None)
# Should not raise AttributeError
assert isinstance(merged_config, OrderedDict), (
"Config should still be OrderedDict after substitution pass"
)
keys = list(merged_config.keys())
assert keys[0] == CONF_SUBSTITUTIONS, "Substitutions should be first key"
def test_validate_config_with_command_line_substitutions_maintains_ordered_dict(
tmp_path,
) -> None:
"""Test that validate_config preserves OrderedDict when merging command-line substitutions.
This tests the code path in config.py where result[CONF_SUBSTITUTIONS] is set
using merge_dicts_ordered() with command-line substitutions provided.
"""
# Create a minimal valid config
test_config = OrderedDict()
test_config["esphome"] = {"name": "test_device", "platform": "ESP32"}
test_config[CONF_SUBSTITUTIONS] = OrderedDict({"var1": "value1", "var2": "value2"})
test_config["esp32"] = {"board": "esp32dev"}
# Command line substitutions that should override
command_line_subs = {"var2": "override", "var3": "new_value"}
# Set up CORE for the test with a proper Path object
test_yaml = tmp_path / "test.yaml"
test_yaml.write_text("# test config")
CORE.config_path = test_yaml
# Call validate_config with command line substitutions
result = config_module.validate_config(test_config, command_line_subs)
# Verify that result[CONF_SUBSTITUTIONS] is an OrderedDict
assert isinstance(result.get(CONF_SUBSTITUTIONS), OrderedDict), (
"Result substitutions should be an OrderedDict"
)
# Verify substitutions were properly merged
assert result[CONF_SUBSTITUTIONS]["var1"] == "value1"
assert result[CONF_SUBSTITUTIONS]["var2"] == "override"
assert result[CONF_SUBSTITUTIONS]["var3"] == "new_value"
def test_validate_config_without_command_line_substitutions_maintains_ordered_dict(
tmp_path,
) -> None:
"""Test that validate_config preserves OrderedDict without command-line substitutions.
This tests the code path in config.py where result[CONF_SUBSTITUTIONS] is set
using merge_dicts_ordered() when command_line_substitutions is None.
"""
# Create a minimal valid config
test_config = OrderedDict()
test_config["esphome"] = {"name": "test_device", "platform": "ESP32"}
test_config[CONF_SUBSTITUTIONS] = OrderedDict({"var1": "value1", "var2": "value2"})
test_config["esp32"] = {"board": "esp32dev"}
# Set up CORE for the test with a proper Path object
test_yaml = tmp_path / "test.yaml"
test_yaml.write_text("# test config")
CORE.config_path = test_yaml
# Call validate_config without command line substitutions
result = config_module.validate_config(test_config, None)
# Verify that result[CONF_SUBSTITUTIONS] is an OrderedDict
assert isinstance(result.get(CONF_SUBSTITUTIONS), OrderedDict), (
"Result substitutions should be an OrderedDict"
)
# Verify substitutions are unchanged
assert result[CONF_SUBSTITUTIONS]["var1"] == "value1"
assert result[CONF_SUBSTITUTIONS]["var2"] == "value2"
def test_merge_config_preserves_ordered_dict() -> None:
"""Test that merge_config preserves OrderedDict type.
This is a regression test to ensure merge_config doesn't lose OrderedDict type
when merging configs, which causes AttributeError on move_to_end().
"""
# Test OrderedDict + dict = OrderedDict
od = OrderedDict([("a", 1), ("b", 2)])
d = {"b": 20, "c": 3}
result = merge_config(od, d)
assert isinstance(result, OrderedDict), (
"OrderedDict + dict should return OrderedDict"
)
# Test dict + OrderedDict = OrderedDict
d = {"a": 1, "b": 2}
od = OrderedDict([("b", 20), ("c", 3)])
result = merge_config(d, od)
assert isinstance(result, OrderedDict), (
"dict + OrderedDict should return OrderedDict"
)
# Test OrderedDict + OrderedDict = OrderedDict
od1 = OrderedDict([("a", 1), ("b", 2)])
od2 = OrderedDict([("b", 20), ("c", 3)])
result = merge_config(od1, od2)
assert isinstance(result, OrderedDict), (
"OrderedDict + OrderedDict should return OrderedDict"
)
# Test that dict + dict still returns regular dict (no unnecessary conversion)
d1 = {"a": 1, "b": 2}
d2 = {"b": 20, "c": 3}
result = merge_config(d1, d2)
assert isinstance(result, dict), "dict + dict should return dict"
assert not isinstance(result, OrderedDict), (
"dict + dict should not return OrderedDict"
)