1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-18 01:33:51 +01:00

Merge branch 'dev' into api_size_limits

This commit is contained in:
J. Nick Koston
2025-10-05 22:29:00 -05:00
committed by GitHub
181 changed files with 4111 additions and 1841 deletions

View File

@@ -0,0 +1,194 @@
"""Tests for PSRAM component."""
from typing import Any
import pytest
from esphome.components.esp32.const import (
KEY_VARIANT,
VARIANT_ESP32,
VARIANT_ESP32C2,
VARIANT_ESP32C3,
VARIANT_ESP32C5,
VARIANT_ESP32C6,
VARIANT_ESP32H2,
VARIANT_ESP32P4,
VARIANT_ESP32S2,
VARIANT_ESP32S3,
)
import esphome.config_validation as cv
from esphome.const import CONF_ESPHOME, PlatformFramework
from tests.component_tests.types import SetCoreConfigCallable
UNSUPPORTED_PSRAM_VARIANTS = [
VARIANT_ESP32C2,
VARIANT_ESP32C3,
VARIANT_ESP32C5,
VARIANT_ESP32C6,
VARIANT_ESP32H2,
]
SUPPORTED_PSRAM_VARIANTS = [
VARIANT_ESP32,
VARIANT_ESP32S2,
VARIANT_ESP32S3,
VARIANT_ESP32P4,
]
@pytest.mark.parametrize(
("config", "error_match"),
[
pytest.param(
{},
r"PSRAM is not supported on this chip",
id="psram_not_supported",
),
],
)
@pytest.mark.parametrize("variant", UNSUPPORTED_PSRAM_VARIANTS)
def test_psram_configuration_errors_unsupported_variants(
config: Any,
error_match: str,
variant: str,
set_core_config: SetCoreConfigCallable,
) -> None:
set_core_config(
PlatformFramework.ESP32_IDF,
platform_data={KEY_VARIANT: variant},
full_config={CONF_ESPHOME: {}},
)
"""Test detection of invalid PSRAM configuration on unsupported variants."""
from esphome.components.psram import CONFIG_SCHEMA
with pytest.raises(cv.Invalid, match=error_match):
CONFIG_SCHEMA(config)
@pytest.mark.parametrize("variant", SUPPORTED_PSRAM_VARIANTS)
def test_psram_configuration_valid_supported_variants(
variant: str,
set_core_config: SetCoreConfigCallable,
) -> None:
set_core_config(
PlatformFramework.ESP32_IDF,
platform_data={KEY_VARIANT: variant},
full_config={
CONF_ESPHOME: {},
"esp32": {
"variant": variant,
"cpu_frequency": "160MHz",
"framework": {"type": "esp-idf"},
},
},
)
"""Test that PSRAM configuration is valid on supported variants."""
from esphome.components.psram import CONFIG_SCHEMA, FINAL_VALIDATE_SCHEMA
# This should not raise an exception
config = CONFIG_SCHEMA({})
FINAL_VALIDATE_SCHEMA(config)
def _setup_psram_final_validation_test(
esp32_config: dict,
set_core_config: SetCoreConfigCallable,
set_component_config: Any,
) -> str:
"""Helper function to set up ESP32 configuration for PSRAM final validation tests."""
# Use ESP32S3 for schema validation to allow all options, then override for final validation
schema_variant = "ESP32S3"
final_variant = esp32_config.get("variant", "ESP32S3")
full_esp32_config = {
"variant": final_variant,
"cpu_frequency": esp32_config.get("cpu_frequency", "240MHz"),
"framework": {"type": "esp-idf"},
}
set_core_config(
PlatformFramework.ESP32_IDF,
platform_data={KEY_VARIANT: schema_variant},
full_config={
CONF_ESPHOME: {},
"esp32": full_esp32_config,
},
)
set_component_config("esp32", full_esp32_config)
return final_variant
@pytest.mark.parametrize(
("config", "esp32_config", "expect_error", "error_match"),
[
pytest.param(
{"speed": "120MHz"},
{"cpu_frequency": "160MHz"},
True,
r"PSRAM 120MHz requires 240MHz CPU frequency",
id="120mhz_requires_240mhz_cpu",
),
pytest.param(
{"mode": "octal"},
{"variant": "ESP32"},
True,
r"Octal PSRAM is only supported on ESP32-S3",
id="octal_mode_only_esp32s3",
),
pytest.param(
{"mode": "quad", "enable_ecc": True},
{},
True,
r"ECC is only available in octal mode",
id="ecc_only_in_octal_mode",
),
pytest.param(
{"speed": "120MHZ"},
{"cpu_frequency": "240MHZ"},
False,
None,
id="120mhz_with_240mhz_cpu",
),
pytest.param(
{"mode": "octal"},
{"variant": "ESP32S3"},
False,
None,
id="octal_mode_on_esp32s3",
),
pytest.param(
{"mode": "octal", "enable_ecc": True},
{"variant": "ESP32S3"},
False,
None,
id="ecc_in_octal_mode",
),
],
)
def test_psram_final_validation(
config: Any,
esp32_config: dict,
expect_error: bool,
error_match: str | None,
set_core_config: SetCoreConfigCallable,
set_component_config: Any,
) -> None:
"""Test PSRAM final validation for both error and valid cases."""
from esphome.components.psram import CONFIG_SCHEMA, FINAL_VALIDATE_SCHEMA
from esphome.core import CORE
final_variant = _setup_psram_final_validation_test(
esp32_config, set_core_config, set_component_config
)
validated_config = CONFIG_SCHEMA(config)
# Update CORE variant for final validation
CORE.data["esp32"][KEY_VARIANT] = final_variant
if expect_error:
with pytest.raises(cv.Invalid, match=error_match):
FINAL_VALIDATE_SCHEMA(validated_config)
else:
# This should not raise an exception
FINAL_VALIDATE_SCHEMA(validated_config)

View File

@@ -0,0 +1 @@
<<: !include common.yaml

View File

@@ -0,0 +1 @@
<<: !include common.yaml

View File

@@ -0,0 +1 @@
<<: !include common.yaml

View File

@@ -0,0 +1 @@
<<: !include common.yaml

View File

@@ -0,0 +1,14 @@
deep_sleep:
run_duration:
default: 10s
gpio_wakeup_reason: 30s
touch_wakeup_reason: 15s
sleep_duration: 50s
wakeup_pin: ${wakeup_pin}
wakeup_pin_mode: INVERT_WAKEUP
esp32_ext1_wakeup:
pins:
- number: GPIO2
- number: GPIO13
mode: ANY_HIGH
touch_wakeup: true

View File

@@ -0,0 +1,12 @@
deep_sleep:
run_duration:
default: 10s
gpio_wakeup_reason: 30s
sleep_duration: 50s
wakeup_pin: ${wakeup_pin}
wakeup_pin_mode: INVERT_WAKEUP
esp32_ext1_wakeup:
pins:
- number: GPIO2
- number: GPIO5
mode: ANY_HIGH

View File

@@ -2,4 +2,4 @@ substitutions:
wakeup_pin: GPIO4
<<: !include common.yaml
<<: !include common-esp32.yaml
<<: !include common-esp32-ext1.yaml

View File

@@ -2,4 +2,4 @@ substitutions:
wakeup_pin: GPIO4
<<: !include common.yaml
<<: !include common-esp32.yaml
<<: !include common-esp32-all.yaml

View File

@@ -2,4 +2,4 @@ substitutions:
wakeup_pin: GPIO4
<<: !include common.yaml
<<: !include common-esp32.yaml
<<: !include common-esp32-all.yaml

View File

@@ -2,4 +2,4 @@ substitutions:
wakeup_pin: GPIO4
<<: !include common.yaml
<<: !include common-esp32.yaml
<<: !include common-esp32-all.yaml

View File

@@ -0,0 +1,89 @@
esphome:
on_boot:
then:
- canbus.send:
# Extended ID explicit
canbus_id: esp32_internal_can
use_extended_id: true
can_id: 0x100
data: [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]
- canbus.send:
# Standard ID by default
canbus_id: esp32_internal_can
can_id: 0x100
data: [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]
- canbus.send:
# Extended ID explicit
canbus_id: esp32_internal_can_2
use_extended_id: true
can_id: 0x100
data: [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]
- canbus.send:
# Standard ID by default
canbus_id: esp32_internal_can_2
can_id: 0x100
data: [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]
canbus:
- platform: esp32_can
id: esp32_internal_can
rx_pin: GPIO8
tx_pin: GPIO7
can_id: 4
bit_rate: 50kbps
on_frame:
- can_id: 500
then:
- lambda: |-
std::string b(x.begin(), x.end());
ESP_LOGD("canbus1", "canid 500 %s", b.c_str() );
- can_id: 0b00000000000000000000001000000
can_id_mask: 0b11111000000000011111111000000
use_extended_id: true
then:
- lambda: |-
auto pdo_id = can_id >> 14;
switch (pdo_id)
{
case 117:
ESP_LOGD("canbus1", "exhaust_fan_duty");
break;
case 118:
ESP_LOGD("canbus1", "supply_fan_duty");
break;
case 119:
ESP_LOGD("canbus1", "supply_fan_flow");
break;
// to be continued...
}
- platform: esp32_can
id: esp32_internal_can_2
rx_pin: GPIO10
tx_pin: GPIO9
can_id: 4
bit_rate: 50kbps
on_frame:
- can_id: 500
then:
- lambda: |-
std::string b(x.begin(), x.end());
ESP_LOGD("canbus2", "canid 500 %s", b.c_str() );
- can_id: 0b00000000000000000000001000000
can_id_mask: 0b11111000000000011111111000000
use_extended_id: true
then:
- lambda: |-
auto pdo_id = can_id >> 14;
switch (pdo_id)
{
case 117:
ESP_LOGD("canbus2", "exhaust_fan_duty");
break;
case 118:
ESP_LOGD("canbus2", "supply_fan_duty");
break;
case 119:
ESP_LOGD("canbus2", "supply_fan_flow");
break;
// to be continued...
}

View File

@@ -12,3 +12,4 @@ ethernet:
gateway: 192.168.178.1
subnet: 255.255.255.0
domain: .local
mac_address: "02:AA:BB:CC:DD:01"

View File

@@ -12,3 +12,4 @@ ethernet:
gateway: 192.168.178.1
subnet: 255.255.255.0
domain: .local
mac_address: "02:AA:BB:CC:DD:01"

View File

@@ -12,3 +12,4 @@ ethernet:
gateway: 192.168.178.1
subnet: 255.255.255.0
domain: .local
mac_address: "02:AA:BB:CC:DD:01"

View File

@@ -12,3 +12,4 @@ ethernet:
gateway: 192.168.178.1
subnet: 255.255.255.0
domain: .local
mac_address: "02:AA:BB:CC:DD:01"

View File

@@ -12,3 +12,4 @@ ethernet:
gateway: 192.168.178.1
subnet: 255.255.255.0
domain: .local
mac_address: "02:AA:BB:CC:DD:01"

View File

@@ -12,3 +12,4 @@ ethernet:
gateway: 192.168.178.1
subnet: 255.255.255.0
domain: .local
mac_address: "02:AA:BB:CC:DD:01"

View File

@@ -12,3 +12,4 @@ ethernet:
gateway: 192.168.178.1
subnet: 255.255.255.0
domain: .local
mac_address: "02:AA:BB:CC:DD:01"

View File

@@ -12,3 +12,4 @@ ethernet:
gateway: 192.168.178.1
subnet: 255.255.255.0
domain: .local
mac_address: "02:AA:BB:CC:DD:01"

View File

@@ -12,3 +12,4 @@ ethernet:
gateway: 192.168.178.1
subnet: 255.255.255.0
domain: .local
mac_address: "02:AA:BB:CC:DD:01"

View File

@@ -6,11 +6,16 @@ esphome:
format: "Warning: Logger level is %d"
args: [id(logger_id).get_log_level()]
- logger.set_level: WARN
- logger.set_level:
level: ERROR
tag: mqtt.client
logger:
id: logger_id
level: DEBUG
initial_level: INFO
logs:
mqtt.component: WARN
select:
- platform: logger

View File

@@ -0,0 +1 @@
<<: !include common.yaml

View File

@@ -0,0 +1 @@
<<: !include common.yaml

View File

@@ -0,0 +1 @@
<<: !include common.yaml

View File

@@ -0,0 +1 @@
<<: !include common.yaml

View File

@@ -0,0 +1 @@
<<: !include common.yaml

View File

@@ -0,0 +1 @@
<<: !include common.yaml

View File

@@ -11,6 +11,10 @@ sx126x:
pa_power: 3
bandwidth: 125_0kHz
crc_enable: true
crc_initial: 0x1D0F
crc_polynomial: 0x1021
crc_size: 2
crc_inverted: true
frequency: 433920000
modulation: LORA
rx_start: true

View File

@@ -341,6 +341,7 @@ datetime:
time:
- platform: sntp # Required for datetime
id: sntp_time
wifi: # Required for sntp time
ap:

View File

@@ -0,0 +1,6 @@
packages: !include common.yaml
time:
- id: !remove sntp_time
wifi: !remove

View File

@@ -0,0 +1,6 @@
packages: !include common.yaml
time:
- id: !remove sntp_time
wifi: !remove

View File

@@ -0,0 +1 @@
<<: !include common.yaml

View File

@@ -0,0 +1 @@
<<: !include common.yaml

View File

@@ -2,20 +2,42 @@
from __future__ import annotations
from unittest.mock import Mock
from pathlib import Path
from unittest.mock import MagicMock, Mock
import pytest
import pytest_asyncio
from esphome.dashboard.core import ESPHomeDashboard
from esphome.dashboard.entries import DashboardEntries
@pytest.fixture
def mock_dashboard() -> Mock:
def mock_settings(tmp_path: Path) -> MagicMock:
"""Create mock dashboard settings."""
settings = MagicMock()
settings.config_dir = str(tmp_path)
settings.absolute_config_dir = tmp_path
return settings
@pytest.fixture
def mock_dashboard(mock_settings: MagicMock) -> Mock:
"""Create a mock dashboard."""
dashboard = Mock(spec=ESPHomeDashboard)
dashboard.settings = mock_settings
dashboard.entries = Mock()
dashboard.entries.async_all.return_value = []
dashboard.stop_event = Mock()
dashboard.stop_event.is_set.return_value = True
dashboard.ping_request = Mock()
dashboard.ignored_devices = set()
dashboard.bus = Mock()
dashboard.bus.async_fire = Mock()
return dashboard
@pytest_asyncio.fixture
async def dashboard_entries(mock_dashboard: Mock) -> DashboardEntries:
"""Create a DashboardEntries instance for testing."""
return DashboardEntries(mock_dashboard)

View File

@@ -8,7 +8,9 @@ import pytest
import pytest_asyncio
from zeroconf import AddressResolver, IPVersion
from esphome.dashboard.const import DashboardEvent
from esphome.dashboard.status.mdns import MDNSStatus
from esphome.zeroconf import DiscoveredImport
@pytest_asyncio.fixture
@@ -166,3 +168,73 @@ async def test_async_setup_failure(mock_dashboard: Mock) -> None:
result = mdns_status.async_setup()
assert result is False
assert mdns_status.aiozc is None
@pytest.mark.asyncio
async def test_on_import_update_device_added(mdns_status: MDNSStatus) -> None:
"""Test _on_import_update when a device is added."""
# Create a DiscoveredImport object
discovered = DiscoveredImport(
device_name="test_device",
friendly_name="Test Device",
package_import_url="https://example.com/package",
project_name="test_project",
project_version="1.0.0",
network="wifi",
)
# Call _on_import_update with a device
mdns_status._on_import_update("test_device", discovered)
# Should fire IMPORTABLE_DEVICE_ADDED event
mock_dashboard = mdns_status.dashboard
mock_dashboard.bus.async_fire.assert_called_once()
call_args = mock_dashboard.bus.async_fire.call_args
assert call_args[0][0] == DashboardEvent.IMPORTABLE_DEVICE_ADDED
assert "device" in call_args[0][1]
device_data = call_args[0][1]["device"]
assert device_data["name"] == "test_device"
assert device_data["friendly_name"] == "Test Device"
assert device_data["project_name"] == "test_project"
assert device_data["ignored"] is False
@pytest.mark.asyncio
async def test_on_import_update_device_ignored(mdns_status: MDNSStatus) -> None:
"""Test _on_import_update when a device is ignored."""
# Add device to ignored list
mdns_status.dashboard.ignored_devices.add("ignored_device")
# Create a DiscoveredImport object for ignored device
discovered = DiscoveredImport(
device_name="ignored_device",
friendly_name="Ignored Device",
package_import_url="https://example.com/package",
project_name="test_project",
project_version="1.0.0",
network="ethernet",
)
# Call _on_import_update with an ignored device
mdns_status._on_import_update("ignored_device", discovered)
# Should fire IMPORTABLE_DEVICE_ADDED event with ignored=True
mock_dashboard = mdns_status.dashboard
mock_dashboard.bus.async_fire.assert_called_once()
call_args = mock_dashboard.bus.async_fire.call_args
assert call_args[0][0] == DashboardEvent.IMPORTABLE_DEVICE_ADDED
device_data = call_args[0][1]["device"]
assert device_data["name"] == "ignored_device"
assert device_data["ignored"] is True
@pytest.mark.asyncio
async def test_on_import_update_device_removed(mdns_status: MDNSStatus) -> None:
"""Test _on_import_update when a device is removed."""
# Call _on_import_update with None (device removed)
mdns_status._on_import_update("removed_device", None)
# Should fire IMPORTABLE_DEVICE_REMOVED event
mdns_status.dashboard.bus.async_fire.assert_called_once_with(
DashboardEvent.IMPORTABLE_DEVICE_REMOVED, {"name": "removed_device"}
)

View File

@@ -2,14 +2,15 @@
from __future__ import annotations
import os
from pathlib import Path
import tempfile
from unittest.mock import MagicMock
from unittest.mock import Mock
import pytest
import pytest_asyncio
from esphome.core import CORE
from esphome.dashboard.const import DashboardEvent
from esphome.dashboard.entries import DashboardEntries, DashboardEntry
@@ -27,21 +28,6 @@ def setup_core():
CORE.reset()
@pytest.fixture
def mock_settings() -> MagicMock:
"""Create mock dashboard settings."""
settings = MagicMock()
settings.config_dir = "/test/config"
settings.absolute_config_dir = Path("/test/config")
return settings
@pytest_asyncio.fixture
async def dashboard_entries(mock_settings: MagicMock) -> DashboardEntries:
"""Create a DashboardEntries instance for testing."""
return DashboardEntries(mock_settings)
def test_dashboard_entry_path_initialization() -> None:
"""Test DashboardEntry initializes with path correctly."""
test_path = Path("/test/config/device.yaml")
@@ -78,15 +64,24 @@ def test_dashboard_entry_path_with_relative_path() -> None:
@pytest.mark.asyncio
async def test_dashboard_entries_get_by_path(
dashboard_entries: DashboardEntries,
dashboard_entries: DashboardEntries, tmp_path: Path
) -> None:
"""Test getting entry by path."""
test_path = Path("/test/config/device.yaml")
entry = DashboardEntry(test_path, create_cache_key())
# Create a test file
test_file = tmp_path / "device.yaml"
test_file.write_text("test config")
dashboard_entries._entries[str(test_path)] = entry
# Update entries to load the file
await dashboard_entries.async_update_entries()
result = dashboard_entries.get(str(test_path))
# Verify the entry was loaded
all_entries = dashboard_entries.async_all()
assert len(all_entries) == 1
entry = all_entries[0]
assert entry.path == test_file
# Also verify get() works with Path
result = dashboard_entries.get(test_file)
assert result == entry
@@ -101,45 +96,54 @@ async def test_dashboard_entries_get_nonexistent_path(
@pytest.mark.asyncio
async def test_dashboard_entries_path_normalization(
dashboard_entries: DashboardEntries,
dashboard_entries: DashboardEntries, tmp_path: Path
) -> None:
"""Test that paths are handled consistently."""
path1 = Path("/test/config/device.yaml")
# Create a test file
test_file = tmp_path / "device.yaml"
test_file.write_text("test config")
entry = DashboardEntry(path1, create_cache_key())
dashboard_entries._entries[str(path1)] = entry
# Update entries to load the file
await dashboard_entries.async_update_entries()
result = dashboard_entries.get(str(path1))
assert result == entry
# Get the entry by path
result = dashboard_entries.get(test_file)
assert result is not None
@pytest.mark.asyncio
async def test_dashboard_entries_path_with_spaces(
dashboard_entries: DashboardEntries,
dashboard_entries: DashboardEntries, tmp_path: Path
) -> None:
"""Test handling paths with spaces."""
test_path = Path("/test/config/my device.yaml")
entry = DashboardEntry(test_path, create_cache_key())
# Create a test file with spaces in name
test_file = tmp_path / "my device.yaml"
test_file.write_text("test config")
dashboard_entries._entries[str(test_path)] = entry
# Update entries to load the file
await dashboard_entries.async_update_entries()
result = dashboard_entries.get(str(test_path))
assert result == entry
assert result.path == test_path
# Get the entry by path
result = dashboard_entries.get(test_file)
assert result is not None
assert result.path == test_file
@pytest.mark.asyncio
async def test_dashboard_entries_path_with_special_chars(
dashboard_entries: DashboardEntries,
dashboard_entries: DashboardEntries, tmp_path: Path
) -> None:
"""Test handling paths with special characters."""
test_path = Path("/test/config/device-01_test.yaml")
entry = DashboardEntry(test_path, create_cache_key())
# Create a test file with special characters
test_file = tmp_path / "device-01_test.yaml"
test_file.write_text("test config")
dashboard_entries._entries[str(test_path)] = entry
# Update entries to load the file
await dashboard_entries.async_update_entries()
result = dashboard_entries.get(str(test_path))
assert result == entry
# Get the entry by path
result = dashboard_entries.get(test_file)
assert result is not None
def test_dashboard_entries_windows_path() -> None:
@@ -154,22 +158,25 @@ def test_dashboard_entries_windows_path() -> None:
@pytest.mark.asyncio
async def test_dashboard_entries_path_to_cache_key_mapping(
dashboard_entries: DashboardEntries,
dashboard_entries: DashboardEntries, tmp_path: Path
) -> None:
"""Test internal entries storage with paths and cache keys."""
path1 = Path("/test/config/device1.yaml")
path2 = Path("/test/config/device2.yaml")
# Create test files
file1 = tmp_path / "device1.yaml"
file2 = tmp_path / "device2.yaml"
file1.write_text("test config 1")
file2.write_text("test config 2")
entry1 = DashboardEntry(path1, create_cache_key())
entry2 = DashboardEntry(path2, (1, 1, 1.0, 1))
# Update entries to load the files
await dashboard_entries.async_update_entries()
dashboard_entries._entries[str(path1)] = entry1
dashboard_entries._entries[str(path2)] = entry2
# Get entries and verify they have different cache keys
entry1 = dashboard_entries.get(file1)
entry2 = dashboard_entries.get(file2)
assert str(path1) in dashboard_entries._entries
assert str(path2) in dashboard_entries._entries
assert dashboard_entries._entries[str(path1)].cache_key == create_cache_key()
assert dashboard_entries._entries[str(path2)].cache_key == (1, 1, 1.0, 1)
assert entry1 is not None
assert entry2 is not None
assert entry1.cache_key != entry2.cache_key
def test_dashboard_entry_path_property() -> None:
@@ -183,21 +190,99 @@ def test_dashboard_entry_path_property() -> None:
@pytest.mark.asyncio
async def test_dashboard_entries_all_returns_entries_with_paths(
dashboard_entries: DashboardEntries,
dashboard_entries: DashboardEntries, tmp_path: Path
) -> None:
"""Test that all() returns entries with their paths intact."""
paths = [
Path("/test/config/device1.yaml"),
Path("/test/config/device2.yaml"),
Path("/test/config/subfolder/device3.yaml"),
# Create test files
files = [
tmp_path / "device1.yaml",
tmp_path / "device2.yaml",
tmp_path / "device3.yaml",
]
for path in paths:
entry = DashboardEntry(path, create_cache_key())
dashboard_entries._entries[str(path)] = entry
for file in files:
file.write_text("test config")
# Update entries to load the files
await dashboard_entries.async_update_entries()
all_entries = dashboard_entries.async_all()
assert len(all_entries) == len(paths)
assert len(all_entries) == len(files)
retrieved_paths = [entry.path for entry in all_entries]
assert set(retrieved_paths) == set(paths)
assert set(retrieved_paths) == set(files)
@pytest.mark.asyncio
async def test_async_update_entries_removed_path(
dashboard_entries: DashboardEntries, mock_dashboard: Mock, tmp_path: Path
) -> None:
"""Test that removed files trigger ENTRY_REMOVED event."""
# Create a test file
test_file = tmp_path / "device.yaml"
test_file.write_text("test config")
# First update to add the entry
await dashboard_entries.async_update_entries()
# Verify entry was added
all_entries = dashboard_entries.async_all()
assert len(all_entries) == 1
entry = all_entries[0]
# Delete the file
test_file.unlink()
# Second update to detect removal
await dashboard_entries.async_update_entries()
# Verify entry was removed
all_entries = dashboard_entries.async_all()
assert len(all_entries) == 0
# Verify ENTRY_REMOVED event was fired
mock_dashboard.bus.async_fire.assert_any_call(
DashboardEvent.ENTRY_REMOVED, {"entry": entry}
)
@pytest.mark.asyncio
async def test_async_update_entries_updated_path(
dashboard_entries: DashboardEntries, mock_dashboard: Mock, tmp_path: Path
) -> None:
"""Test that modified files trigger ENTRY_UPDATED event."""
# Create a test file
test_file = tmp_path / "device.yaml"
test_file.write_text("test config")
# First update to add the entry
await dashboard_entries.async_update_entries()
# Verify entry was added
all_entries = dashboard_entries.async_all()
assert len(all_entries) == 1
entry = all_entries[0]
original_cache_key = entry.cache_key
# Modify the file to change its mtime
test_file.write_text("updated config")
# Explicitly change the mtime to ensure it's different
stat = test_file.stat()
os.utime(test_file, (stat.st_atime, stat.st_mtime + 1))
# Second update to detect modification
await dashboard_entries.async_update_entries()
# Verify entry is still there with updated cache key
all_entries = dashboard_entries.async_all()
assert len(all_entries) == 1
updated_entry = all_entries[0]
assert updated_entry == entry # Same entry object
assert updated_entry.cache_key != original_cache_key # But cache key updated
# Verify ENTRY_UPDATED event was fired
mock_dashboard.bus.async_fire.assert_any_call(
DashboardEvent.ENTRY_UPDATED, {"entry": entry}
)

View File

@@ -2,11 +2,12 @@ from __future__ import annotations
import asyncio
from collections.abc import Generator
from contextlib import asynccontextmanager
import gzip
import json
import os
from pathlib import Path
from unittest.mock import MagicMock, Mock, patch
from unittest.mock import AsyncMock, MagicMock, Mock, patch
import pytest
import pytest_asyncio
@@ -14,9 +15,19 @@ from tornado.httpclient import AsyncHTTPClient, HTTPClientError, HTTPResponse
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.testing import bind_unused_port
from tornado.websocket import WebSocketClientConnection, websocket_connect
from esphome.dashboard import web_server
from esphome.dashboard.const import DashboardEvent
from esphome.dashboard.core import DASHBOARD
from esphome.dashboard.entries import (
DashboardEntry,
EntryStateSource,
bool_to_entry_state,
)
from esphome.dashboard.models import build_importable_device_dict
from esphome.dashboard.web_server import DashboardSubscriber
from esphome.zeroconf import DiscoveredImport
from .common import get_fixture_path
@@ -126,6 +137,33 @@ async def dashboard() -> DashboardTestHelper:
io_loop.close()
@asynccontextmanager
async def websocket_connection(dashboard: DashboardTestHelper):
"""Async context manager for WebSocket connections."""
url = f"ws://127.0.0.1:{dashboard.port}/events"
ws = await websocket_connect(url)
try:
yield ws
finally:
if ws:
ws.close()
@pytest_asyncio.fixture
async def websocket_client(dashboard: DashboardTestHelper) -> WebSocketClientConnection:
"""Create a WebSocket connection for testing."""
url = f"ws://127.0.0.1:{dashboard.port}/events"
ws = await websocket_connect(url)
# Read and discard initial state message
await ws.read_message()
yield ws
if ws:
ws.close()
@pytest.mark.asyncio
async def test_main_page(dashboard: DashboardTestHelper) -> None:
response = await dashboard.fetch("/")
@@ -810,3 +848,457 @@ def test_build_cache_arguments_name_without_address(mock_dashboard: Mock) -> Non
mock_dashboard.mdns_status.get_cached_addresses.assert_called_once_with(
"my-device.local"
)
@pytest.mark.asyncio
async def test_websocket_connection_initial_state(
dashboard: DashboardTestHelper,
) -> None:
"""Test WebSocket connection and initial state."""
async with websocket_connection(dashboard) as ws:
# Should receive initial state with configured and importable devices
msg = await ws.read_message()
assert msg is not None
data = json.loads(msg)
assert data["event"] == "initial_state"
assert "devices" in data["data"]
assert "configured" in data["data"]["devices"]
assert "importable" in data["data"]["devices"]
# Check configured devices
configured = data["data"]["devices"]["configured"]
assert len(configured) > 0
assert configured[0]["name"] == "pico" # From test fixtures
@pytest.mark.asyncio
async def test_websocket_ping_pong(
dashboard: DashboardTestHelper, websocket_client: WebSocketClientConnection
) -> None:
"""Test WebSocket ping/pong mechanism."""
# Send ping
await websocket_client.write_message(json.dumps({"event": "ping"}))
# Should receive pong
msg = await websocket_client.read_message()
assert msg is not None
data = json.loads(msg)
assert data["event"] == "pong"
@pytest.mark.asyncio
async def test_websocket_invalid_json(
dashboard: DashboardTestHelper, websocket_client: WebSocketClientConnection
) -> None:
"""Test WebSocket handling of invalid JSON."""
# Send invalid JSON
await websocket_client.write_message("not valid json {]")
# Send a valid ping to verify connection is still alive
await websocket_client.write_message(json.dumps({"event": "ping"}))
# Should receive pong, confirming the connection wasn't closed by invalid JSON
msg = await websocket_client.read_message()
assert msg is not None
data = json.loads(msg)
assert data["event"] == "pong"
@pytest.mark.asyncio
async def test_websocket_authentication_required(
dashboard: DashboardTestHelper,
) -> None:
"""Test WebSocket authentication when auth is required."""
with patch(
"esphome.dashboard.web_server.is_authenticated"
) as mock_is_authenticated:
mock_is_authenticated.return_value = False
# Try to connect - should be rejected with 401
url = f"ws://127.0.0.1:{dashboard.port}/events"
with pytest.raises(HTTPClientError) as exc_info:
await websocket_connect(url)
# Should get HTTP 401 Unauthorized
assert exc_info.value.code == 401
@pytest.mark.asyncio
async def test_websocket_authentication_not_required(
dashboard: DashboardTestHelper,
) -> None:
"""Test WebSocket connection when no auth is required."""
with patch(
"esphome.dashboard.web_server.is_authenticated"
) as mock_is_authenticated:
mock_is_authenticated.return_value = True
# Should be able to connect successfully
async with websocket_connection(dashboard) as ws:
msg = await ws.read_message()
assert msg is not None
data = json.loads(msg)
assert data["event"] == "initial_state"
@pytest.mark.asyncio
async def test_websocket_entry_state_changed(
dashboard: DashboardTestHelper, websocket_client: WebSocketClientConnection
) -> None:
"""Test WebSocket entry state changed event."""
# Simulate entry state change
entry = DASHBOARD.entries.async_all()[0]
state = bool_to_entry_state(True, EntryStateSource.MDNS)
DASHBOARD.bus.async_fire(
DashboardEvent.ENTRY_STATE_CHANGED, {"entry": entry, "state": state}
)
# Should receive state change event
msg = await websocket_client.read_message()
assert msg is not None
data = json.loads(msg)
assert data["event"] == "entry_state_changed"
assert data["data"]["filename"] == entry.filename
assert data["data"]["name"] == entry.name
assert data["data"]["state"] is True
@pytest.mark.asyncio
async def test_websocket_entry_added(
dashboard: DashboardTestHelper, websocket_client: WebSocketClientConnection
) -> None:
"""Test WebSocket entry added event."""
# Create a mock entry
mock_entry = Mock(spec=DashboardEntry)
mock_entry.filename = "test.yaml"
mock_entry.name = "test_device"
mock_entry.to_dict.return_value = {
"name": "test_device",
"filename": "test.yaml",
"configuration": "test.yaml",
}
# Simulate entry added
DASHBOARD.bus.async_fire(DashboardEvent.ENTRY_ADDED, {"entry": mock_entry})
# Should receive entry added event
msg = await websocket_client.read_message()
assert msg is not None
data = json.loads(msg)
assert data["event"] == "entry_added"
assert data["data"]["device"]["name"] == "test_device"
assert data["data"]["device"]["filename"] == "test.yaml"
@pytest.mark.asyncio
async def test_websocket_entry_removed(
dashboard: DashboardTestHelper, websocket_client: WebSocketClientConnection
) -> None:
"""Test WebSocket entry removed event."""
# Create a mock entry
mock_entry = Mock(spec=DashboardEntry)
mock_entry.filename = "removed.yaml"
mock_entry.name = "removed_device"
mock_entry.to_dict.return_value = {
"name": "removed_device",
"filename": "removed.yaml",
"configuration": "removed.yaml",
}
# Simulate entry removed
DASHBOARD.bus.async_fire(DashboardEvent.ENTRY_REMOVED, {"entry": mock_entry})
# Should receive entry removed event
msg = await websocket_client.read_message()
assert msg is not None
data = json.loads(msg)
assert data["event"] == "entry_removed"
assert data["data"]["device"]["name"] == "removed_device"
assert data["data"]["device"]["filename"] == "removed.yaml"
@pytest.mark.asyncio
async def test_websocket_importable_device_added(
dashboard: DashboardTestHelper, websocket_client: WebSocketClientConnection
) -> None:
"""Test WebSocket importable device added event with real DiscoveredImport."""
# Create a real DiscoveredImport object
discovered = DiscoveredImport(
device_name="new_import_device",
friendly_name="New Import Device",
package_import_url="https://example.com/package",
project_name="test_project",
project_version="1.0.0",
network="wifi",
)
# Directly fire the event as the mDNS system would
device_dict = build_importable_device_dict(DASHBOARD, discovered)
DASHBOARD.bus.async_fire(
DashboardEvent.IMPORTABLE_DEVICE_ADDED, {"device": device_dict}
)
# Should receive importable device added event
msg = await websocket_client.read_message()
assert msg is not None
data = json.loads(msg)
assert data["event"] == "importable_device_added"
assert data["data"]["device"]["name"] == "new_import_device"
assert data["data"]["device"]["friendly_name"] == "New Import Device"
assert data["data"]["device"]["project_name"] == "test_project"
assert data["data"]["device"]["network"] == "wifi"
assert data["data"]["device"]["ignored"] is False
@pytest.mark.asyncio
async def test_websocket_importable_device_added_ignored(
dashboard: DashboardTestHelper, websocket_client: WebSocketClientConnection
) -> None:
"""Test WebSocket importable device added event for ignored device."""
# Add device to ignored list
DASHBOARD.ignored_devices.add("ignored_device")
# Create a real DiscoveredImport object
discovered = DiscoveredImport(
device_name="ignored_device",
friendly_name="Ignored Device",
package_import_url="https://example.com/package",
project_name="test_project",
project_version="1.0.0",
network="ethernet",
)
# Directly fire the event as the mDNS system would
device_dict = build_importable_device_dict(DASHBOARD, discovered)
DASHBOARD.bus.async_fire(
DashboardEvent.IMPORTABLE_DEVICE_ADDED, {"device": device_dict}
)
# Should receive importable device added event with ignored=True
msg = await websocket_client.read_message()
assert msg is not None
data = json.loads(msg)
assert data["event"] == "importable_device_added"
assert data["data"]["device"]["name"] == "ignored_device"
assert data["data"]["device"]["friendly_name"] == "Ignored Device"
assert data["data"]["device"]["network"] == "ethernet"
assert data["data"]["device"]["ignored"] is True
@pytest.mark.asyncio
async def test_websocket_importable_device_removed(
dashboard: DashboardTestHelper, websocket_client: WebSocketClientConnection
) -> None:
"""Test WebSocket importable device removed event."""
# Simulate importable device removed
DASHBOARD.bus.async_fire(
DashboardEvent.IMPORTABLE_DEVICE_REMOVED,
{"name": "removed_import_device"},
)
# Should receive importable device removed event
msg = await websocket_client.read_message()
assert msg is not None
data = json.loads(msg)
assert data["event"] == "importable_device_removed"
assert data["data"]["name"] == "removed_import_device"
@pytest.mark.asyncio
async def test_websocket_importable_device_already_configured(
dashboard: DashboardTestHelper, websocket_client: WebSocketClientConnection
) -> None:
"""Test that importable device event is not sent if device is already configured."""
# Get an existing configured device name
existing_entry = DASHBOARD.entries.async_all()[0]
# Simulate importable device added with same name as configured device
DASHBOARD.bus.async_fire(
DashboardEvent.IMPORTABLE_DEVICE_ADDED,
{
"device": {
"name": existing_entry.name,
"friendly_name": "Should Not Be Sent",
"package_import_url": "https://example.com/package",
"project_name": "test_project",
"project_version": "1.0.0",
"network": "wifi",
}
},
)
# Send a ping to ensure connection is still alive
await websocket_client.write_message(json.dumps({"event": "ping"}))
# Should only receive pong, not the importable device event
msg = await websocket_client.read_message()
assert msg is not None
data = json.loads(msg)
assert data["event"] == "pong"
@pytest.mark.asyncio
async def test_websocket_multiple_connections(dashboard: DashboardTestHelper) -> None:
"""Test multiple WebSocket connections."""
async with (
websocket_connection(dashboard) as ws1,
websocket_connection(dashboard) as ws2,
):
# Both should receive initial state
msg1 = await ws1.read_message()
assert msg1 is not None
data1 = json.loads(msg1)
assert data1["event"] == "initial_state"
msg2 = await ws2.read_message()
assert msg2 is not None
data2 = json.loads(msg2)
assert data2["event"] == "initial_state"
# Fire an event - both should receive it
entry = DASHBOARD.entries.async_all()[0]
state = bool_to_entry_state(False, EntryStateSource.MDNS)
DASHBOARD.bus.async_fire(
DashboardEvent.ENTRY_STATE_CHANGED, {"entry": entry, "state": state}
)
msg1 = await ws1.read_message()
assert msg1 is not None
data1 = json.loads(msg1)
assert data1["event"] == "entry_state_changed"
msg2 = await ws2.read_message()
assert msg2 is not None
data2 = json.loads(msg2)
assert data2["event"] == "entry_state_changed"
@pytest.mark.asyncio
async def test_dashboard_subscriber_lifecycle(dashboard: DashboardTestHelper) -> None:
"""Test DashboardSubscriber lifecycle."""
subscriber = DashboardSubscriber()
# Initially no subscribers
assert len(subscriber._subscribers) == 0
assert subscriber._event_loop_task is None
# Add a subscriber
mock_websocket = Mock()
unsubscribe = subscriber.subscribe(mock_websocket)
# Should have started the event loop task
assert len(subscriber._subscribers) == 1
assert subscriber._event_loop_task is not None
# Unsubscribe
unsubscribe()
# Should have stopped the task
assert len(subscriber._subscribers) == 0
@pytest.mark.asyncio
async def test_dashboard_subscriber_entries_update_interval(
dashboard: DashboardTestHelper,
) -> None:
"""Test DashboardSubscriber entries update interval."""
# Patch the constants to make the test run faster
with (
patch("esphome.dashboard.web_server.DASHBOARD_POLL_INTERVAL", 0.01),
patch("esphome.dashboard.web_server.DASHBOARD_ENTRIES_UPDATE_ITERATIONS", 2),
patch("esphome.dashboard.web_server.settings") as mock_settings,
patch("esphome.dashboard.web_server.DASHBOARD") as mock_dashboard,
):
mock_settings.status_use_mqtt = False
# Mock dashboard dependencies
mock_dashboard.ping_request = Mock()
mock_dashboard.ping_request.set = Mock()
mock_dashboard.entries = Mock()
mock_dashboard.entries.async_request_update_entries = Mock()
subscriber = DashboardSubscriber()
mock_websocket = Mock()
# Subscribe to start the event loop
unsubscribe = subscriber.subscribe(mock_websocket)
# Wait for a few iterations to ensure entries update is called
await asyncio.sleep(0.05) # Should be enough for 2+ iterations
# Unsubscribe to stop the task
unsubscribe()
# Verify entries update was called
assert mock_dashboard.entries.async_request_update_entries.call_count >= 1
# Verify ping request was set multiple times
assert mock_dashboard.ping_request.set.call_count >= 2
@pytest.mark.asyncio
async def test_websocket_refresh_command(
dashboard: DashboardTestHelper, websocket_client: WebSocketClientConnection
) -> None:
"""Test WebSocket refresh command triggers dashboard update."""
with patch("esphome.dashboard.web_server.DASHBOARD_SUBSCRIBER") as mock_subscriber:
mock_subscriber.request_refresh = Mock()
# Send refresh command
await websocket_client.write_message(json.dumps({"event": "refresh"}))
# Give it a moment to process
await asyncio.sleep(0.01)
# Verify request_refresh was called
mock_subscriber.request_refresh.assert_called_once()
@pytest.mark.asyncio
async def test_dashboard_subscriber_refresh_event(
dashboard: DashboardTestHelper,
) -> None:
"""Test DashboardSubscriber refresh event triggers immediate update."""
# Patch the constants to make the test run faster
with (
patch(
"esphome.dashboard.web_server.DASHBOARD_POLL_INTERVAL", 1.0
), # Long timeout
patch(
"esphome.dashboard.web_server.DASHBOARD_ENTRIES_UPDATE_ITERATIONS", 100
), # Won't reach naturally
patch("esphome.dashboard.web_server.settings") as mock_settings,
patch("esphome.dashboard.web_server.DASHBOARD") as mock_dashboard,
):
mock_settings.status_use_mqtt = False
# Mock dashboard dependencies
mock_dashboard.ping_request = Mock()
mock_dashboard.ping_request.set = Mock()
mock_dashboard.entries = Mock()
mock_dashboard.entries.async_request_update_entries = AsyncMock()
subscriber = DashboardSubscriber()
mock_websocket = Mock()
# Subscribe to start the event loop
unsubscribe = subscriber.subscribe(mock_websocket)
# Wait a bit to ensure loop is running
await asyncio.sleep(0.01)
# Verify entries update hasn't been called yet (iterations not reached)
assert mock_dashboard.entries.async_request_update_entries.call_count == 0
# Request refresh
subscriber.request_refresh()
# Wait for the refresh to be processed
await asyncio.sleep(0.01)
# Now entries update should have been called
assert mock_dashboard.entries.async_request_update_entries.call_count == 1
# Unsubscribe to stop the task
unsubscribe()
# Give it a moment to clean up
await asyncio.sleep(0.01)

View File

@@ -5,6 +5,9 @@ substitutions:
var21: '79'
value: 33
values: 44
position:
x: 79
y: 82
esphome:
name: test
@@ -26,3 +29,7 @@ test_list:
- Literal $values ${are not substituted}
- ["list $value", "${is not}", "${substituted}"]
- {"$dictionary": "$value", "${is not}": "${substituted}"}
- |-
{{{ "x", "79"}, { "y", "82"}}}
- '{{{"AA"}}}'
- '"HELLO"'

View File

@@ -8,6 +8,9 @@ substitutions:
var21: "79"
value: 33
values: 44
position:
x: 79
y: 82
test_list:
- "$var1"
@@ -27,3 +30,7 @@ test_list:
- !literal Literal $values ${are not substituted}
- !literal ["list $value", "${is not}", "${substituted}"]
- !literal {"$dictionary": "$value", "${is not}": "${substituted}"}
- |- # Test parsing things that look like a python set of sets when rendered:
{{{ "x", "${ position.x }"}, { "y", "${ position.y }"}}}
- ${ '{{{"AA"}}}' }
- ${ '"HELLO"' }

View File

@@ -790,16 +790,21 @@ def test_clean_all(
with caplog.at_level("INFO"):
clean_all([str(config1_dir), str(config2_dir)])
# Verify deletions
assert not build_dir1.exists()
assert not build_dir2.exists()
# Verify deletions - .esphome directories remain but contents are cleaned
# The .esphome directory itself is not removed because it may contain storage
assert build_dir1.exists()
assert build_dir2.exists()
# Verify that files in .esphome were removed
assert not (build_dir1 / "dummy.txt").exists()
assert not (build_dir2 / "dummy.txt").exists()
assert not pio_cache.exists()
assert not pio_packages.exists()
assert not pio_platforms.exists()
assert not pio_core.exists()
# Verify logging mentions each
assert "Deleting" in caplog.text
assert "Cleaning" in caplog.text
assert str(build_dir1) in caplog.text
assert str(build_dir2) in caplog.text
assert "PlatformIO cache" in caplog.text
@@ -808,6 +813,55 @@ def test_clean_all(
assert "PlatformIO core" in caplog.text
@patch("esphome.writer.CORE")
def test_clean_all_preserves_storage(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_all preserves storage directory."""
# Create build directory with storage subdirectory
config_dir = tmp_path / "config"
config_dir.mkdir()
build_dir = config_dir / ".esphome"
build_dir.mkdir()
(build_dir / "dummy.txt").write_text("x")
(build_dir / "other_file.txt").write_text("y")
# Create storage directory with content
storage_dir = build_dir / "storage"
storage_dir.mkdir()
(storage_dir / "storage.json").write_text('{"test": "data"}')
(storage_dir / "other_storage.txt").write_text("storage content")
# Call clean_all
from esphome.writer import clean_all
with caplog.at_level("INFO"):
clean_all([str(config_dir)])
# Verify .esphome directory still exists
assert build_dir.exists()
# Verify storage directory still exists with its contents
assert storage_dir.exists()
assert (storage_dir / "storage.json").exists()
assert (storage_dir / "other_storage.txt").exists()
# Verify storage contents are intact
assert (storage_dir / "storage.json").read_text() == '{"test": "data"}'
assert (storage_dir / "other_storage.txt").read_text() == "storage content"
# Verify other files were removed
assert not (build_dir / "dummy.txt").exists()
assert not (build_dir / "other_file.txt").exists()
# Verify logging mentions deletion
assert "Cleaning" in caplog.text
assert str(build_dir) in caplog.text
@patch("esphome.writer.CORE")
def test_clean_all_platformio_not_available(
mock_core: MagicMock,
@@ -833,8 +887,8 @@ def test_clean_all_platformio_not_available(
):
clean_all([str(config_dir)])
# Build dir removed, PlatformIO dirs remain
assert not build_dir.exists()
# Build dir contents cleaned, PlatformIO dirs remain
assert build_dir.exists()
assert pio_cache.exists()
# No PlatformIO-specific logs
@@ -866,4 +920,68 @@ def test_clean_all_partial_exists(
clean_all([str(config_dir)])
assert not build_dir.exists()
assert build_dir.exists()
@patch("esphome.writer.CORE")
def test_clean_all_removes_non_storage_directories(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_all removes directories other than storage."""
# Create build directory with various subdirectories
config_dir = tmp_path / "config"
config_dir.mkdir()
build_dir = config_dir / ".esphome"
build_dir.mkdir()
# Create files
(build_dir / "file1.txt").write_text("content1")
(build_dir / "file2.txt").write_text("content2")
# Create storage directory (should be preserved)
storage_dir = build_dir / "storage"
storage_dir.mkdir()
(storage_dir / "storage.json").write_text('{"test": "data"}')
# Create other directories (should be removed)
cache_dir = build_dir / "cache"
cache_dir.mkdir()
(cache_dir / "cache_file.txt").write_text("cache content")
logs_dir = build_dir / "logs"
logs_dir.mkdir()
(logs_dir / "log1.txt").write_text("log content")
temp_dir = build_dir / "temp"
temp_dir.mkdir()
(temp_dir / "temp_file.txt").write_text("temp content")
# Call clean_all
from esphome.writer import clean_all
with caplog.at_level("INFO"):
clean_all([str(config_dir)])
# Verify .esphome directory still exists
assert build_dir.exists()
# Verify storage directory and its contents are preserved
assert storage_dir.exists()
assert (storage_dir / "storage.json").exists()
assert (storage_dir / "storage.json").read_text() == '{"test": "data"}'
# Verify files were removed
assert not (build_dir / "file1.txt").exists()
assert not (build_dir / "file2.txt").exists()
# Verify non-storage directories were removed
assert not cache_dir.exists()
assert not logs_dir.exists()
assert not temp_dir.exists()
# Verify logging mentions cleaning
assert "Cleaning" in caplog.text
assert str(build_dir) in caplog.text