diff --git a/esphome/core/scheduler.cpp b/esphome/core/scheduler.cpp index 6269a66543..c3ade260ac 100644 --- a/esphome/core/scheduler.cpp +++ b/esphome/core/scheduler.cpp @@ -82,7 +82,13 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type item->set_name(name_cstr, !is_static_string); item->type = type; item->callback = std::move(func); + // Initialize remove to false (though it should already be from constructor) + // Not using mark_item_removed_ helper since we're setting to false, not true +#ifdef ESPHOME_THREAD_MULTI_ATOMICS + item->remove.store(false, std::memory_order_relaxed); +#else item->remove = false; +#endif item->is_retry = is_retry; #ifndef ESPHOME_THREAD_SINGLE @@ -398,6 +404,31 @@ void HOT Scheduler::call(uint32_t now) { this->pop_raw_(); continue; } + + // Check if item is marked for removal + // This handles two cases: + // 1. Item was marked for removal after cleanup_() but before we got here + // 2. Item is marked for removal but wasn't at the front of the heap during cleanup_() +#ifdef ESPHOME_THREAD_MULTI_NO_ATOMICS + // Multi-threaded platforms without atomics: must take lock to safely read remove flag + { + LockGuard guard{this->lock_}; + if (is_item_removed_(item.get())) { + this->pop_raw_(); + this->to_remove_--; + continue; + } + } +#else + // Single-threaded or multi-threaded with atomics: can check without lock + if (is_item_removed_(item.get())) { + LockGuard guard{this->lock_}; + this->pop_raw_(); + this->to_remove_--; + continue; + } +#endif + #ifdef ESPHOME_DEBUG_SCHEDULER const char *item_name = item->get_name(); ESP_LOGV(TAG, "Running %s '%s/%s' with interval=%" PRIu32 " next_execution=%" PRIu64 " (now=%" PRIu64 ")", @@ -518,7 +549,7 @@ bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_c if (type == SchedulerItem::TIMEOUT) { for (auto &item : this->defer_queue_) { if (this->matches_item_(item, component, name_cstr, type, match_retry)) { - item->remove = true; + this->mark_item_removed_(item.get()); total_cancelled++; } } @@ -528,7 +559,7 @@ bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_c // Cancel items in the main heap for (auto &item : this->items_) { if (this->matches_item_(item, component, name_cstr, type, match_retry)) { - item->remove = true; + this->mark_item_removed_(item.get()); total_cancelled++; this->to_remove_++; // Track removals for heap items } @@ -537,7 +568,7 @@ bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_c // Cancel items in to_add_ for (auto &item : this->to_add_) { if (this->matches_item_(item, component, name_cstr, type, match_retry)) { - item->remove = true; + this->mark_item_removed_(item.get()); total_cancelled++; // Don't track removals for to_add_ items } diff --git a/esphome/core/scheduler.h b/esphome/core/scheduler.h index a6092e1b1e..c73bd55d5d 100644 --- a/esphome/core/scheduler.h +++ b/esphome/core/scheduler.h @@ -97,22 +97,42 @@ class Scheduler { std::function callback; - // Bit-packed fields to minimize padding +#ifdef ESPHOME_THREAD_MULTI_ATOMICS + // Multi-threaded with atomics: use atomic for lock-free access + // Place atomic separately since it can't be packed with bit fields + std::atomic remove{false}; + + // Bit-packed fields (3 bits used, 5 bits padding in 1 byte) + enum Type : uint8_t { TIMEOUT, INTERVAL } type : 1; + bool name_is_dynamic : 1; // True if name was dynamically allocated (needs delete[]) + bool is_retry : 1; // True if this is a retry timeout + // 5 bits padding +#else + // Single-threaded or multi-threaded without atomics: can pack all fields together + // Bit-packed fields (4 bits used, 4 bits padding in 1 byte) enum Type : uint8_t { TIMEOUT, INTERVAL } type : 1; bool remove : 1; bool name_is_dynamic : 1; // True if name was dynamically allocated (needs delete[]) bool is_retry : 1; // True if this is a retry timeout - // 4 bits padding + // 4 bits padding +#endif // Constructor SchedulerItem() : component(nullptr), interval(0), next_execution_(0), +#ifdef ESPHOME_THREAD_MULTI_ATOMICS + // remove is initialized in the member declaration as std::atomic{false} + type(TIMEOUT), + name_is_dynamic(false), + is_retry(false) { +#else type(TIMEOUT), remove(false), name_is_dynamic(false), is_retry(false) { +#endif name_.static_name = nullptr; } @@ -219,6 +239,37 @@ class Scheduler { return item->remove || (item->component != nullptr && item->component->is_failed()); } + // Helper to check if item is marked for removal (platform-specific) + // Returns true if item should be skipped, handles platform-specific synchronization + // For ESPHOME_THREAD_MULTI_NO_ATOMICS platforms, the caller must hold the scheduler lock before calling this + // function. + bool is_item_removed_(SchedulerItem *item) const { +#ifdef ESPHOME_THREAD_MULTI_ATOMICS + // Multi-threaded with atomics: use atomic load for lock-free access + return item->remove.load(std::memory_order_acquire); +#else + // Single-threaded (ESPHOME_THREAD_SINGLE) or + // multi-threaded without atomics (ESPHOME_THREAD_MULTI_NO_ATOMICS): direct read + // For ESPHOME_THREAD_MULTI_NO_ATOMICS, caller MUST hold lock! + return item->remove; +#endif + } + + // Helper to mark item for removal (platform-specific) + // For ESPHOME_THREAD_MULTI_NO_ATOMICS platforms, the caller must hold the scheduler lock before calling this + // function. + void mark_item_removed_(SchedulerItem *item) { +#ifdef ESPHOME_THREAD_MULTI_ATOMICS + // Multi-threaded with atomics: use atomic store + item->remove.store(true, std::memory_order_release); +#else + // Single-threaded (ESPHOME_THREAD_SINGLE) or + // multi-threaded without atomics (ESPHOME_THREAD_MULTI_NO_ATOMICS): direct write + // For ESPHOME_THREAD_MULTI_NO_ATOMICS, caller MUST hold lock! + item->remove = true; +#endif + } + // Template helper to check if any item in a container matches our criteria template bool has_cancelled_timeout_in_container_(const Container &container, Component *component, const char *name_cstr, diff --git a/tests/integration/fixtures/scheduler_removed_item_race.yaml b/tests/integration/fixtures/scheduler_removed_item_race.yaml new file mode 100644 index 0000000000..2f8a7fb987 --- /dev/null +++ b/tests/integration/fixtures/scheduler_removed_item_race.yaml @@ -0,0 +1,139 @@ +esphome: + name: scheduler-removed-item-race + +host: + +api: + services: + - service: run_test + then: + - script.execute: run_test_script + +logger: + level: DEBUG + +globals: + - id: test_passed + type: bool + initial_value: 'true' + - id: removed_item_executed + type: int + initial_value: '0' + - id: normal_item_executed + type: int + initial_value: '0' + +sensor: + - platform: template + id: test_sensor + name: "Test Sensor" + update_interval: never + lambda: return 0.0; + +script: + - id: run_test_script + then: + - logger.log: "=== Starting Removed Item Race Test ===" + + # This test creates a scenario where: + # 1. First item in heap is NOT cancelled (cleanup stops immediately) + # 2. Items behind it ARE cancelled (remain in heap after cleanup) + # 3. All items execute at the same time, including cancelled ones + + - lambda: |- + // The key to hitting the race: + // 1. Add items in a specific order to control heap structure + // 2. Cancel ONLY items that won't be at the front + // 3. Ensure the first item stays non-cancelled so cleanup_() stops immediately + + // Schedule all items to execute at the SAME time (1ms from now) + // Using 1ms instead of 0 to avoid defer queue on multi-core platforms + // This ensures they'll all be ready together and go through the heap + const uint32_t exec_time = 1; + + // CRITICAL: Add a non-cancellable item FIRST + // This will be at the front of the heap and block cleanup_() + App.scheduler.set_timeout(id(test_sensor), "blocker", exec_time, []() { + ESP_LOGD("test", "Blocker timeout executed (expected) - was at front of heap"); + id(normal_item_executed)++; + }); + + // Now add items that we WILL cancel + // These will be behind the blocker in the heap + App.scheduler.set_timeout(id(test_sensor), "cancel_1", exec_time, []() { + ESP_LOGE("test", "RACE: Cancelled timeout 1 executed after being cancelled!"); + id(removed_item_executed)++; + id(test_passed) = false; + }); + + App.scheduler.set_timeout(id(test_sensor), "cancel_2", exec_time, []() { + ESP_LOGE("test", "RACE: Cancelled timeout 2 executed after being cancelled!"); + id(removed_item_executed)++; + id(test_passed) = false; + }); + + App.scheduler.set_timeout(id(test_sensor), "cancel_3", exec_time, []() { + ESP_LOGE("test", "RACE: Cancelled timeout 3 executed after being cancelled!"); + id(removed_item_executed)++; + id(test_passed) = false; + }); + + // Add some more normal items + App.scheduler.set_timeout(id(test_sensor), "normal_1", exec_time, []() { + ESP_LOGD("test", "Normal timeout 1 executed (expected)"); + id(normal_item_executed)++; + }); + + App.scheduler.set_timeout(id(test_sensor), "normal_2", exec_time, []() { + ESP_LOGD("test", "Normal timeout 2 executed (expected)"); + id(normal_item_executed)++; + }); + + App.scheduler.set_timeout(id(test_sensor), "normal_3", exec_time, []() { + ESP_LOGD("test", "Normal timeout 3 executed (expected)"); + id(normal_item_executed)++; + }); + + // Force items into the heap before cancelling + App.scheduler.process_to_add(); + + // NOW cancel the items - they're behind "blocker" in the heap + // When cleanup_() runs, it will see "blocker" (not removed) at the front + // and stop immediately, leaving cancel_1, cancel_2, cancel_3 in the heap + bool c1 = App.scheduler.cancel_timeout(id(test_sensor), "cancel_1"); + bool c2 = App.scheduler.cancel_timeout(id(test_sensor), "cancel_2"); + bool c3 = App.scheduler.cancel_timeout(id(test_sensor), "cancel_3"); + + ESP_LOGD("test", "Cancelled items (behind blocker): %s, %s, %s", + c1 ? "true" : "false", + c2 ? "true" : "false", + c3 ? "true" : "false"); + + // The heap now has: + // - "blocker" at front (not cancelled) + // - cancelled items behind it (marked remove=true but still in heap) + // - When all execute at once, cleanup_() stops at "blocker" + // - The loop then executes ALL ready items including cancelled ones + + ESP_LOGD("test", "Setup complete. Blocker at front prevents cleanup of cancelled items behind it"); + + # Wait for all timeouts to execute (or not) + - delay: 20ms + + # Check results + - lambda: |- + ESP_LOGI("test", "=== Test Results ==="); + ESP_LOGI("test", "Normal items executed: %d (expected 4)", id(normal_item_executed)); + ESP_LOGI("test", "Removed items executed: %d (expected 0)", id(removed_item_executed)); + + if (id(removed_item_executed) > 0) { + ESP_LOGE("test", "TEST FAILED: %d cancelled items were executed!", id(removed_item_executed)); + id(test_passed) = false; + } else if (id(normal_item_executed) != 4) { + ESP_LOGE("test", "TEST FAILED: Expected 4 normal items, got %d", id(normal_item_executed)); + id(test_passed) = false; + } else { + ESP_LOGI("test", "TEST PASSED: No cancelled items were executed"); + } + + ESP_LOGI("test", "=== Test Complete ==="); diff --git a/tests/integration/test_scheduler_removed_item_race.py b/tests/integration/test_scheduler_removed_item_race.py new file mode 100644 index 0000000000..3e72bacc0d --- /dev/null +++ b/tests/integration/test_scheduler_removed_item_race.py @@ -0,0 +1,102 @@ +"""Test for scheduler race condition where removed items still execute.""" + +import asyncio +import re + +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_scheduler_removed_item_race( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that items marked for removal don't execute. + + This test verifies the fix for a race condition where: + 1. cleanup_() only removes items from the front of the heap + 2. Items in the middle of the heap marked for removal still execute + 3. This causes cancelled timeouts to run when they shouldn't + """ + + loop = asyncio.get_running_loop() + test_complete_future: asyncio.Future[bool] = loop.create_future() + + # Track test results + test_passed = False + removed_executed = 0 + normal_executed = 0 + + # Patterns to match + race_pattern = re.compile(r"RACE: .* executed after being cancelled!") + passed_pattern = re.compile(r"TEST PASSED") + failed_pattern = re.compile(r"TEST FAILED") + complete_pattern = re.compile(r"=== Test Complete ===") + normal_count_pattern = re.compile(r"Normal items executed: (\d+)") + removed_count_pattern = re.compile(r"Removed items executed: (\d+)") + + def check_output(line: str) -> None: + """Check log output for test results.""" + nonlocal test_passed, removed_executed, normal_executed + + if race_pattern.search(line): + # Race condition detected - a cancelled item executed + test_passed = False + + if passed_pattern.search(line): + test_passed = True + elif failed_pattern.search(line): + test_passed = False + + normal_match = normal_count_pattern.search(line) + if normal_match: + normal_executed = int(normal_match.group(1)) + + removed_match = removed_count_pattern.search(line) + if removed_match: + removed_executed = int(removed_match.group(1)) + + if not test_complete_future.done() and complete_pattern.search(line): + test_complete_future.set_result(True) + + async with ( + run_compiled(yaml_config, line_callback=check_output), + api_client_connected() as client, + ): + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "scheduler-removed-item-race" + + # List services + _, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find run_test service + run_test_service = next((s for s in services if s.name == "run_test"), None) + assert run_test_service is not None, "run_test service not found" + + # Execute the test + client.execute_service(run_test_service, {}) + + # Wait for test completion + try: + await asyncio.wait_for(test_complete_future, timeout=5.0) + except TimeoutError: + pytest.fail("Test did not complete within timeout") + + # Verify results + assert test_passed, ( + f"Test failed! Removed items executed: {removed_executed}, " + f"Normal items executed: {normal_executed}" + ) + assert removed_executed == 0, ( + f"Cancelled items should not execute, but {removed_executed} did" + ) + assert normal_executed == 4, ( + f"Expected 4 normal items to execute, got {normal_executed}" + )