1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-26 04:33:47 +00:00

tests, address review comments

This commit is contained in:
J. Nick Koston
2025-06-15 20:22:29 -05:00
parent a4efc63bf2
commit 787ec43266
10 changed files with 686 additions and 32 deletions

161
benchmark_extended.cpp Normal file
View File

@@ -0,0 +1,161 @@
#include <iostream>
#include <vector>
#include <list>
#include <chrono>
#include <memory>
#include <iomanip>
class Component {
public:
Component(int id) : id_(id) {}
void call() {
// Minimal work to highlight iteration overhead
volatile int x = id_;
x++;
}
bool should_skip_loop() const { return skip_; }
void set_skip(bool skip) { skip_ = skip; }
private:
int id_;
bool skip_ = false;
char padding_[119]; // Total size ~128 bytes
};
int main() {
const int num_components = 40;
const int iterations = 1000000; // 1 million iterations
std::cout << "=== Extended Performance Test ===" << std::endl;
std::cout << "Components: " << num_components << std::endl;
std::cout << "Iterations: " << iterations << std::endl;
std::cout << "Testing overhead of flag checking vs list iteration\n" << std::endl;
// Create components
std::vector<std::unique_ptr<Component>> owned;
std::vector<Component *> components;
for (int i = 0; i < num_components; i++) {
owned.push_back(std::make_unique<Component>(i));
components.push_back(owned.back().get());
}
// Test 1: All components active (best case for both)
{
std::cout << "--- Test 1: All components active ---" << std::endl;
// Vector test
auto start = std::chrono::high_resolution_clock::now();
for (int iter = 0; iter < iterations; iter++) {
for (auto *comp : components) {
if (!comp->should_skip_loop()) {
comp->call();
}
}
}
auto end = std::chrono::high_resolution_clock::now();
auto vector_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
// List test
std::list<Component *> list_components(components.begin(), components.end());
start = std::chrono::high_resolution_clock::now();
for (int iter = 0; iter < iterations; iter++) {
for (auto *comp : list_components) {
comp->call();
}
}
end = std::chrono::high_resolution_clock::now();
auto list_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
std::cout << "Vector: " << vector_duration.count() << " µs" << std::endl;
std::cout << "List: " << list_duration.count() << " µs" << std::endl;
std::cout << "List is " << std::fixed << std::setprecision(1)
<< (list_duration.count() * 100.0 / vector_duration.count() - 100) << "% slower\n"
<< std::endl;
}
// Test 2: 25% components disabled (ESPHome scenario)
{
std::cout << "--- Test 2: 25% components disabled ---" << std::endl;
// Disable 25% of components
for (int i = 0; i < num_components / 4; i++) {
components[i]->set_skip(true);
}
// Vector test
auto start = std::chrono::high_resolution_clock::now();
long long checks = 0, calls = 0;
for (int iter = 0; iter < iterations; iter++) {
for (auto *comp : components) {
checks++;
if (!comp->should_skip_loop()) {
calls++;
comp->call();
}
}
}
auto end = std::chrono::high_resolution_clock::now();
auto vector_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
// List test (with only active components)
std::list<Component *> list_components;
for (auto *comp : components) {
if (!comp->should_skip_loop()) {
list_components.push_back(comp);
}
}
start = std::chrono::high_resolution_clock::now();
long long list_calls = 0;
for (int iter = 0; iter < iterations; iter++) {
for (auto *comp : list_components) {
list_calls++;
comp->call();
}
}
end = std::chrono::high_resolution_clock::now();
auto list_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
std::cout << "Vector: " << vector_duration.count() << " µs (" << checks << " checks, " << calls << " calls)"
<< std::endl;
std::cout << "List: " << list_duration.count() << " µs (" << list_calls << " calls, no wasted checks)" << std::endl;
std::cout << "Wasted work in vector: " << (checks - calls) << " flag checks" << std::endl;
double overhead_percent = (vector_duration.count() - list_duration.count()) * 100.0 / list_duration.count();
if (overhead_percent > 0) {
std::cout << "Vector is " << std::fixed << std::setprecision(1) << overhead_percent
<< "% slower due to flag checking\n"
<< std::endl;
} else {
std::cout << "List is " << std::fixed << std::setprecision(1) << -overhead_percent << "% slower\n" << std::endl;
}
}
// Test 3: Measure just the flag check overhead
{
std::cout << "--- Test 3: Pure flag check overhead ---" << std::endl;
// Just flag checks, no calls
auto start = std::chrono::high_resolution_clock::now();
long long skipped = 0;
for (int iter = 0; iter < iterations; iter++) {
for (auto *comp : components) {
if (comp->should_skip_loop()) {
skipped++;
}
}
}
auto end = std::chrono::high_resolution_clock::now();
auto check_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
std::cout << "Time for " << (iterations * num_components) << " flag checks: " << check_duration.count() << " µs"
<< std::endl;
std::cout << "Average per flag check: " << (check_duration.count() * 1000.0 / (iterations * num_components))
<< " ns" << std::endl;
std::cout << "Checks that would skip work: " << skipped << std::endl;
}
return 0;
}

View File

@@ -18,8 +18,8 @@ void Anova::setup() {
} }
void Anova::loop() { void Anova::loop() {
// This component uses polling via update() and BLE callbacks // Parent BLEClientNode has a loop() method, but this component uses
// Empty loop not needed, disable to save CPU cycles // polling via update() and BLE callbacks so loop isn't needed
this->disable_loop(); this->disable_loop();
} }

View File

@@ -481,8 +481,8 @@ void BedJetHub::set_clock(uint8_t hour, uint8_t minute) {
/* Internal */ /* Internal */
void BedJetHub::loop() { void BedJetHub::loop() {
// This component uses polling via update() and BLE callbacks // Parent BLEClientNode has a loop() method, but this component uses
// Empty loop not needed, disable to save CPU cycles // polling via update() and BLE callbacks so loop isn't needed
this->disable_loop(); this->disable_loop();
} }
void BedJetHub::update() { this->dispatch_status_(); } void BedJetHub::update() { this->dispatch_status_(); }

View File

@@ -12,8 +12,8 @@ namespace ble_client {
static const char *const TAG = "ble_rssi_sensor"; static const char *const TAG = "ble_rssi_sensor";
void BLEClientRSSISensor::loop() { void BLEClientRSSISensor::loop() {
// This component uses polling via update() and BLE GAP callbacks // Parent BLEClientNode has a loop() method, but this component uses
// Empty loop not needed, disable to save CPU cycles // polling via update() and BLE GAP callbacks so loop isn't needed
this->disable_loop(); this->disable_loop();
} }

View File

@@ -12,8 +12,8 @@ namespace ble_client {
static const char *const TAG = "ble_sensor"; static const char *const TAG = "ble_sensor";
void BLESensor::loop() { void BLESensor::loop() {
// This component uses polling via update() and BLE callbacks // Parent BLEClientNode has a loop() method, but this component uses
// Empty loop not needed, disable to save CPU cycles // polling via update() and BLE callbacks so loop isn't needed
this->disable_loop(); this->disable_loop();
} }

View File

@@ -15,8 +15,8 @@ static const char *const TAG = "ble_text_sensor";
static const std::string EMPTY = ""; static const std::string EMPTY = "";
void BLETextSensor::loop() { void BLETextSensor::loop() {
// This component uses polling via update() and BLE callbacks // Parent BLEClientNode has a loop() method, but this component uses
// Empty loop not needed, disable to save CPU cycles // polling via update() and BLE callbacks so loop isn't needed
this->disable_loop(); this->disable_loop();
} }

378
test_partitioned_vector.cpp Normal file
View File

@@ -0,0 +1,378 @@
#include <iostream>
#include <vector>
#include <cassert>
#include <algorithm>
#include <string>
// Forward declare tests vector
struct Test {
std::string name;
void (*func)();
};
std::vector<Test> tests;
// Minimal test framework
#define TEST(name) \
void test_##name(); \
struct test_##name##_registrar { \
test_##name##_registrar() { tests.push_back({#name, test_##name}); } \
} test_##name##_instance; \
void test_##name()
#define ASSERT(cond) \
do { \
if (!(cond)) { \
std::cerr << "FAILED: " #cond " at " << __FILE__ << ":" << __LINE__ << std::endl; \
exit(1); \
} \
} while (0)
#define ASSERT_EQ(a, b) ASSERT((a) == (b))
// Mock classes matching ESPHome structure
const uint8_t COMPONENT_STATE_MASK = 0x07;
const uint8_t COMPONENT_STATE_LOOP = 0x02;
const uint8_t COMPONENT_STATE_LOOP_DONE = 0x04;
const uint8_t COMPONENT_STATE_FAILED = 0x03;
class Component {
protected:
uint8_t component_state_ = COMPONENT_STATE_LOOP;
int id_;
int loop_count_ = 0;
public:
Component(int id) : id_(id) {}
virtual ~Component() = default;
virtual void call() { loop_count_++; }
int get_id() const { return id_; }
int get_loop_count() const { return loop_count_; }
uint8_t get_state() const { return component_state_ & COMPONENT_STATE_MASK; }
void set_state(uint8_t state) { component_state_ = (component_state_ & ~COMPONENT_STATE_MASK) | state; }
};
class Application {
public:
std::vector<Component *> looping_components_;
uint16_t looping_components_active_end_ = 0;
uint16_t current_loop_index_ = 0;
bool in_loop_ = false;
void add_component(Component *c) {
looping_components_.push_back(c);
looping_components_active_end_ = looping_components_.size();
}
void loop() {
in_loop_ = true;
for (current_loop_index_ = 0; current_loop_index_ < looping_components_active_end_; current_loop_index_++) {
looping_components_[current_loop_index_]->call();
}
in_loop_ = false;
}
void disable_component_loop(Component *component) {
for (uint16_t i = 0; i < looping_components_active_end_; i++) {
if (looping_components_[i] == component) {
looping_components_active_end_--;
if (i != looping_components_active_end_) {
std::swap(looping_components_[i], looping_components_[looping_components_active_end_]);
if (in_loop_ && i == current_loop_index_) {
current_loop_index_--;
}
}
return;
}
}
}
void enable_component_loop(Component *component) {
const uint16_t size = looping_components_.size();
for (uint16_t i = 0; i < size; i++) {
if (looping_components_[i] == component) {
if (i < looping_components_active_end_) {
return; // Already active
}
if (i != looping_components_active_end_) {
std::swap(looping_components_[i], looping_components_[looping_components_active_end_]);
}
looping_components_active_end_++;
return;
}
}
}
// Helper methods for testing
std::vector<int> get_active_ids() const {
std::vector<int> ids;
for (uint16_t i = 0; i < looping_components_active_end_; i++) {
ids.push_back(looping_components_[i]->get_id());
}
return ids;
}
bool is_component_active(Component *c) const {
for (uint16_t i = 0; i < looping_components_active_end_; i++) {
if (looping_components_[i] == c)
return true;
}
return false;
}
};
// Test basic functionality
TEST(basic_loop) {
Application app;
std::vector<std::unique_ptr<Component>> components;
for (int i = 0; i < 5; i++) {
components.push_back(std::make_unique<Component>(i));
app.add_component(components.back().get());
}
app.loop();
for (const auto &c : components) {
ASSERT_EQ(c->get_loop_count(), 1);
}
}
TEST(disable_component) {
Application app;
std::vector<std::unique_ptr<Component>> components;
for (int i = 0; i < 5; i++) {
components.push_back(std::make_unique<Component>(i));
app.add_component(components.back().get());
}
// Disable component 2
app.disable_component_loop(components[2].get());
app.loop();
// Components 0,1,3,4 should have been called
ASSERT_EQ(components[0]->get_loop_count(), 1);
ASSERT_EQ(components[1]->get_loop_count(), 1);
ASSERT_EQ(components[2]->get_loop_count(), 0); // Disabled
ASSERT_EQ(components[3]->get_loop_count(), 1);
ASSERT_EQ(components[4]->get_loop_count(), 1);
// Verify partitioning
ASSERT_EQ(app.looping_components_active_end_, 4);
ASSERT(!app.is_component_active(components[2].get()));
}
TEST(enable_component) {
Application app;
std::vector<std::unique_ptr<Component>> components;
for (int i = 0; i < 5; i++) {
components.push_back(std::make_unique<Component>(i));
app.add_component(components.back().get());
}
// Disable then re-enable
app.disable_component_loop(components[2].get());
app.enable_component_loop(components[2].get());
app.loop();
// All should have been called
for (const auto &c : components) {
ASSERT_EQ(c->get_loop_count(), 1);
}
ASSERT_EQ(app.looping_components_active_end_, 5);
}
TEST(multiple_disable_enable) {
Application app;
std::vector<std::unique_ptr<Component>> components;
for (int i = 0; i < 10; i++) {
components.push_back(std::make_unique<Component>(i));
app.add_component(components.back().get());
}
// Disable multiple
app.disable_component_loop(components[1].get());
app.disable_component_loop(components[5].get());
app.disable_component_loop(components[7].get());
ASSERT_EQ(app.looping_components_active_end_, 7);
app.loop();
// Check counts
int active_count = 0;
for (const auto &c : components) {
if (c->get_loop_count() == 1)
active_count++;
}
ASSERT_EQ(active_count, 7);
// Re-enable one
app.enable_component_loop(components[5].get());
ASSERT_EQ(app.looping_components_active_end_, 8);
app.loop();
ASSERT_EQ(components[5]->get_loop_count(), 1);
}
// Test reentrant behavior
class SelfDisablingComponent : public Component {
Application *app_;
public:
SelfDisablingComponent(int id, Application *app) : Component(id), app_(app) {}
void call() override {
Component::call();
if (loop_count_ == 2) {
app_->disable_component_loop(this);
}
}
};
TEST(reentrant_disable) {
Application app;
std::vector<std::unique_ptr<Component>> components;
// Add regular components
for (int i = 0; i < 3; i++) {
components.push_back(std::make_unique<Component>(i));
app.add_component(components.back().get());
}
// Add self-disabling component
auto self_disable = std::make_unique<SelfDisablingComponent>(3, &app);
app.add_component(self_disable.get());
// Add more regular components
for (int i = 4; i < 6; i++) {
components.push_back(std::make_unique<Component>(i));
app.add_component(components.back().get());
}
// First loop - all active
app.loop();
ASSERT_EQ(app.looping_components_active_end_, 6);
// Second loop - self-disabling component disables itself
app.loop();
ASSERT_EQ(app.looping_components_active_end_, 5);
ASSERT_EQ(self_disable->get_loop_count(), 2);
// Third loop - self-disabling component should not be called
app.loop();
ASSERT_EQ(self_disable->get_loop_count(), 2); // Still 2
}
// Test edge cases
TEST(disable_already_disabled) {
Application app;
auto comp = std::make_unique<Component>(0);
app.add_component(comp.get());
app.disable_component_loop(comp.get());
ASSERT_EQ(app.looping_components_active_end_, 0);
// Disable again - should be no-op
app.disable_component_loop(comp.get());
ASSERT_EQ(app.looping_components_active_end_, 0);
}
TEST(enable_already_enabled) {
Application app;
auto comp = std::make_unique<Component>(0);
app.add_component(comp.get());
ASSERT_EQ(app.looping_components_active_end_, 1);
// Enable again - should be no-op
app.enable_component_loop(comp.get());
ASSERT_EQ(app.looping_components_active_end_, 1);
}
TEST(disable_last_component) {
Application app;
auto comp = std::make_unique<Component>(0);
app.add_component(comp.get());
app.disable_component_loop(comp.get());
ASSERT_EQ(app.looping_components_active_end_, 0);
app.loop(); // Should not crash with empty active set
}
// Test that mimics real ESPHome component behavior
class MockSNTPComponent : public Component {
Application *app_;
bool time_synced_ = false;
public:
MockSNTPComponent(int id, Application *app) : Component(id), app_(app) {}
void call() override {
Component::call();
// Simulate time sync after 3 calls
if (loop_count_ >= 3 && !time_synced_) {
time_synced_ = true;
std::cout << " SNTP: Time synced, disabling loop" << std::endl;
set_state(COMPONENT_STATE_LOOP_DONE);
app_->disable_component_loop(this);
}
}
bool is_synced() const { return time_synced_; }
};
TEST(real_world_sntp) {
Application app;
// Regular components
std::vector<std::unique_ptr<Component>> components;
for (int i = 0; i < 5; i++) {
components.push_back(std::make_unique<Component>(i));
app.add_component(components.back().get());
}
// SNTP component
auto sntp = std::make_unique<MockSNTPComponent>(5, &app);
app.add_component(sntp.get());
// Run 5 iterations
for (int i = 0; i < 5; i++) {
app.loop();
}
// SNTP should have disabled itself after 3 calls
ASSERT_EQ(sntp->get_loop_count(), 3);
ASSERT(sntp->is_synced());
ASSERT_EQ(app.looping_components_active_end_, 5); // SNTP removed
// Regular components should have 5 calls each
for (const auto &c : components) {
ASSERT_EQ(c->get_loop_count(), 5);
}
}
int main() {
std::cout << "Running partitioned vector tests...\n" << std::endl;
for (const auto &test : tests) {
std::cout << "Running test: " << test.name << std::endl;
test.func();
std::cout << " ✓ PASSED" << std::endl;
}
std::cout << "\nAll " << tests.size() << " tests passed!" << std::endl;
return 0;
}

View File

@@ -3,12 +3,13 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from collections.abc import AsyncGenerator, Generator from collections.abc import AsyncGenerator, Callable, Generator
from contextlib import AbstractAsyncContextManager, asynccontextmanager from contextlib import AbstractAsyncContextManager, asynccontextmanager
import logging import logging
import os import os
from pathlib import Path from pathlib import Path
import platform import platform
import pty
import signal import signal
import socket import socket
import sys import sys
@@ -46,8 +47,6 @@ if platform.system() == "Windows":
"Integration tests are not supported on Windows", allow_module_level=True "Integration tests are not supported on Windows", allow_module_level=True
) )
import pty # not available on Windows
@pytest.fixture(scope="module", autouse=True) @pytest.fixture(scope="module", autouse=True)
def enable_aioesphomeapi_debug_logging(): def enable_aioesphomeapi_debug_logging():
@@ -362,7 +361,10 @@ async def api_client_connected(
async def _read_stream_lines( async def _read_stream_lines(
stream: asyncio.StreamReader, lines: list[str], output_stream: TextIO stream: asyncio.StreamReader,
lines: list[str],
output_stream: TextIO,
line_callback: Callable[[str], None] | None = None,
) -> None: ) -> None:
"""Read lines from a stream, append to list, and echo to output stream.""" """Read lines from a stream, append to list, and echo to output stream."""
log_parser = LogParser() log_parser = LogParser()
@@ -380,6 +382,9 @@ async def _read_stream_lines(
file=output_stream, file=output_stream,
flush=True, flush=True,
) )
# Call the callback if provided
if line_callback:
line_callback(decoded_line.rstrip())
@asynccontextmanager @asynccontextmanager
@@ -388,6 +393,7 @@ async def run_binary_and_wait_for_port(
host: str, host: str,
port: int, port: int,
timeout: float = PORT_WAIT_TIMEOUT, timeout: float = PORT_WAIT_TIMEOUT,
line_callback: Callable[[str], None] | None = None,
) -> AsyncGenerator[None]: ) -> AsyncGenerator[None]:
"""Run a binary, wait for it to open a port, and clean up on exit.""" """Run a binary, wait for it to open a port, and clean up on exit."""
# Create a pseudo-terminal to make the binary think it's running interactively # Create a pseudo-terminal to make the binary think it's running interactively
@@ -435,7 +441,9 @@ async def run_binary_and_wait_for_port(
# Read from output stream # Read from output stream
output_tasks = [ output_tasks = [
asyncio.create_task( asyncio.create_task(
_read_stream_lines(output_reader, stdout_lines, sys.stdout) _read_stream_lines(
output_reader, stdout_lines, sys.stdout, line_callback
)
) )
] ]
@@ -515,6 +523,7 @@ async def run_compiled_context(
compile_esphome: CompileFunction, compile_esphome: CompileFunction,
port: int, port: int,
port_socket: socket.socket | None = None, port_socket: socket.socket | None = None,
line_callback: Callable[[str], None] | None = None,
) -> AsyncGenerator[None]: ) -> AsyncGenerator[None]:
"""Context manager to write, compile and run an ESPHome configuration.""" """Context manager to write, compile and run an ESPHome configuration."""
# Write the YAML config # Write the YAML config
@@ -528,7 +537,9 @@ async def run_compiled_context(
port_socket.close() port_socket.close()
# Run the binary and wait for the API server to start # Run the binary and wait for the API server to start
async with run_binary_and_wait_for_port(binary_path, LOCALHOST, port): async with run_binary_and_wait_for_port(
binary_path, LOCALHOST, port, line_callback=line_callback
):
yield yield
@@ -542,7 +553,9 @@ async def run_compiled(
port, port_socket = reserved_tcp_port port, port_socket = reserved_tcp_port
def _run_compiled( def _run_compiled(
yaml_content: str, filename: str | None = None yaml_content: str,
filename: str | None = None,
line_callback: Callable[[str], None] | None = None,
) -> AbstractAsyncContextManager[asyncio.subprocess.Process]: ) -> AbstractAsyncContextManager[asyncio.subprocess.Process]:
return run_compiled_context( return run_compiled_context(
yaml_content, yaml_content,
@@ -551,6 +564,7 @@ async def run_compiled(
compile_esphome, compile_esphome,
port, port,
port_socket, port_socket,
line_callback=line_callback,
) )
yield _run_compiled yield _run_compiled

View File

@@ -2,8 +2,10 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import logging import logging
from pathlib import Path from pathlib import Path
import re
import pytest import pytest
@@ -29,24 +31,111 @@ async def test_loop_disable_enable(
"EXTERNAL_COMPONENT_PATH", external_components_path "EXTERNAL_COMPONENT_PATH", external_components_path
) )
# Write, compile and run the ESPHome device, then connect to API # Track log messages and events
async with run_compiled(yaml_config), api_client_connected() as client: log_messages = []
self_disable_10_disabled = asyncio.Event()
normal_component_10_loops = asyncio.Event()
redundant_enable_tested = asyncio.Event()
redundant_disable_tested = asyncio.Event()
self_disable_10_counts = []
normal_component_counts = []
def on_log_line(line: str) -> None:
"""Process each log line from the process output."""
# Strip ANSI color codes
clean_line = re.sub(r"\x1b\[[0-9;]*m", "", line)
if "loop_test_component" not in clean_line:
return
log_messages.append(clean_line)
# Track specific events using the cleaned line
if "[self_disable_10]" in clean_line:
if "Loop count:" in clean_line:
# Extract loop count
try:
count = int(clean_line.split("Loop count: ")[1])
self_disable_10_counts.append(count)
except (IndexError, ValueError):
pass
elif "Disabling self after 10 loops" in clean_line:
self_disable_10_disabled.set()
elif "[normal_component]" in clean_line and "Loop count:" in clean_line:
try:
count = int(clean_line.split("Loop count: ")[1])
normal_component_counts.append(count)
if count >= 10:
normal_component_10_loops.set()
except (IndexError, ValueError):
pass
elif (
"[redundant_enable]" in clean_line
and "Testing enable when already enabled" in clean_line
):
redundant_enable_tested.set()
elif (
"[redundant_disable]" in clean_line
and "Testing disable when will be disabled" in clean_line
):
redundant_disable_tested.set()
# Write, compile and run the ESPHome device with log callback
async with (
run_compiled(yaml_config, line_callback=on_log_line),
api_client_connected() as client,
):
# Verify we can connect and get device info # Verify we can connect and get device info
device_info = await client.device_info() device_info = await client.device_info()
assert device_info is not None assert device_info is not None
assert device_info.name == "loop-test" assert device_info.name == "loop-test"
# The fact that this compiles and runs proves that: # Wait for self_disable_10 to disable itself
# 1. The partitioned vector implementation works try:
# 2. Components can call disable_loop() and enable_loop() await asyncio.wait_for(self_disable_10_disabled.wait(), timeout=10.0)
# 3. The system handles multiple component instances correctly except asyncio.TimeoutError:
# 4. Actions for enabling/disabling components work pytest.fail("self_disable_10 did not disable itself within 10 seconds")
# Note: Host platform doesn't send component logs through API, # Verify it ran exactly 10 times
# so we can't verify the runtime behavior through logs. assert len(self_disable_10_counts) == 10, (
# However, the successful compilation and execution proves f"Expected 10 loops for self_disable_10, got {len(self_disable_10_counts)}"
# the implementation is correct.
_LOGGER.info(
"Loop disable/enable test passed - code compiles and runs successfully!"
) )
assert self_disable_10_counts == list(range(1, 11)), (
f"Expected counts 1-10, got {self_disable_10_counts}"
)
# Wait for normal_component to run at least 10 times
try:
await asyncio.wait_for(normal_component_10_loops.wait(), timeout=10.0)
except asyncio.TimeoutError:
pytest.fail(
f"normal_component did not reach 10 loops within timeout, got {len(normal_component_counts)}"
)
# Wait for redundant operation tests
try:
await asyncio.wait_for(redundant_enable_tested.wait(), timeout=10.0)
except asyncio.TimeoutError:
pytest.fail("redundant_enable did not test enabling when already enabled")
try:
await asyncio.wait_for(redundant_disable_tested.wait(), timeout=10.0)
except asyncio.TimeoutError:
pytest.fail(
"redundant_disable did not test disabling when will be disabled"
)
# Wait a bit to see if self_disable_10 gets re-enabled
await asyncio.sleep(3)
# Check final counts
later_self_disable_counts = [c for c in self_disable_10_counts if c > 10]
if later_self_disable_counts:
_LOGGER.info(
f"self_disable_10 was successfully re-enabled and ran {len(later_self_disable_counts)} more times"
)
_LOGGER.info("Loop disable/enable test passed - all assertions verified!")

View File

@@ -13,7 +13,19 @@ from aioesphomeapi import APIClient
ConfigWriter = Callable[[str, str | None], Awaitable[Path]] ConfigWriter = Callable[[str, str | None], Awaitable[Path]]
CompileFunction = Callable[[Path], Awaitable[Path]] CompileFunction = Callable[[Path], Awaitable[Path]]
RunFunction = Callable[[Path], Awaitable[asyncio.subprocess.Process]] RunFunction = Callable[[Path], Awaitable[asyncio.subprocess.Process]]
RunCompiledFunction = Callable[[str, str | None], AbstractAsyncContextManager[None]]
class RunCompiledFunction(Protocol):
"""Protocol for run_compiled function with optional line callback."""
def __call__( # noqa: E704
self,
yaml_content: str,
filename: str | None = None,
line_callback: Callable[[str], None] | None = None,
) -> AbstractAsyncContextManager[None]: ...
WaitFunction = Callable[[APIClient, float], Awaitable[bool]] WaitFunction = Callable[[APIClient, float], Awaitable[bool]]