mirror of
https://github.com/esphome/esphome.git
synced 2025-09-01 19:02:18 +01:00
[core] Fix scheduler race condition where cancelled items still execute (#10268)
This commit is contained in:
committed by
Jesse Hills
parent
c5858b7032
commit
bb894c3e32
@@ -82,7 +82,13 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type
|
|||||||
item->set_name(name_cstr, !is_static_string);
|
item->set_name(name_cstr, !is_static_string);
|
||||||
item->type = type;
|
item->type = type;
|
||||||
item->callback = std::move(func);
|
item->callback = std::move(func);
|
||||||
|
// Initialize remove to false (though it should already be from constructor)
|
||||||
|
// Not using mark_item_removed_ helper since we're setting to false, not true
|
||||||
|
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
|
||||||
|
item->remove.store(false, std::memory_order_relaxed);
|
||||||
|
#else
|
||||||
item->remove = false;
|
item->remove = false;
|
||||||
|
#endif
|
||||||
item->is_retry = is_retry;
|
item->is_retry = is_retry;
|
||||||
|
|
||||||
#ifndef ESPHOME_THREAD_SINGLE
|
#ifndef ESPHOME_THREAD_SINGLE
|
||||||
@@ -398,6 +404,31 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
this->pop_raw_();
|
this->pop_raw_();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if item is marked for removal
|
||||||
|
// This handles two cases:
|
||||||
|
// 1. Item was marked for removal after cleanup_() but before we got here
|
||||||
|
// 2. Item is marked for removal but wasn't at the front of the heap during cleanup_()
|
||||||
|
#ifdef ESPHOME_THREAD_MULTI_NO_ATOMICS
|
||||||
|
// Multi-threaded platforms without atomics: must take lock to safely read remove flag
|
||||||
|
{
|
||||||
|
LockGuard guard{this->lock_};
|
||||||
|
if (is_item_removed_(item.get())) {
|
||||||
|
this->pop_raw_();
|
||||||
|
this->to_remove_--;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
// Single-threaded or multi-threaded with atomics: can check without lock
|
||||||
|
if (is_item_removed_(item.get())) {
|
||||||
|
LockGuard guard{this->lock_};
|
||||||
|
this->pop_raw_();
|
||||||
|
this->to_remove_--;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#ifdef ESPHOME_DEBUG_SCHEDULER
|
#ifdef ESPHOME_DEBUG_SCHEDULER
|
||||||
const char *item_name = item->get_name();
|
const char *item_name = item->get_name();
|
||||||
ESP_LOGV(TAG, "Running %s '%s/%s' with interval=%" PRIu32 " next_execution=%" PRIu64 " (now=%" PRIu64 ")",
|
ESP_LOGV(TAG, "Running %s '%s/%s' with interval=%" PRIu32 " next_execution=%" PRIu64 " (now=%" PRIu64 ")",
|
||||||
@@ -518,7 +549,7 @@ bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_c
|
|||||||
if (type == SchedulerItem::TIMEOUT) {
|
if (type == SchedulerItem::TIMEOUT) {
|
||||||
for (auto &item : this->defer_queue_) {
|
for (auto &item : this->defer_queue_) {
|
||||||
if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
|
if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
|
||||||
item->remove = true;
|
this->mark_item_removed_(item.get());
|
||||||
total_cancelled++;
|
total_cancelled++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -528,7 +559,7 @@ bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_c
|
|||||||
// Cancel items in the main heap
|
// Cancel items in the main heap
|
||||||
for (auto &item : this->items_) {
|
for (auto &item : this->items_) {
|
||||||
if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
|
if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
|
||||||
item->remove = true;
|
this->mark_item_removed_(item.get());
|
||||||
total_cancelled++;
|
total_cancelled++;
|
||||||
this->to_remove_++; // Track removals for heap items
|
this->to_remove_++; // Track removals for heap items
|
||||||
}
|
}
|
||||||
@@ -537,7 +568,7 @@ bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_c
|
|||||||
// Cancel items in to_add_
|
// Cancel items in to_add_
|
||||||
for (auto &item : this->to_add_) {
|
for (auto &item : this->to_add_) {
|
||||||
if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
|
if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
|
||||||
item->remove = true;
|
this->mark_item_removed_(item.get());
|
||||||
total_cancelled++;
|
total_cancelled++;
|
||||||
// Don't track removals for to_add_ items
|
// Don't track removals for to_add_ items
|
||||||
}
|
}
|
||||||
|
@@ -97,22 +97,42 @@ class Scheduler {
|
|||||||
|
|
||||||
std::function<void()> callback;
|
std::function<void()> callback;
|
||||||
|
|
||||||
// Bit-packed fields to minimize padding
|
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
|
||||||
|
// Multi-threaded with atomics: use atomic for lock-free access
|
||||||
|
// Place atomic<bool> separately since it can't be packed with bit fields
|
||||||
|
std::atomic<bool> remove{false};
|
||||||
|
|
||||||
|
// Bit-packed fields (3 bits used, 5 bits padding in 1 byte)
|
||||||
|
enum Type : uint8_t { TIMEOUT, INTERVAL } type : 1;
|
||||||
|
bool name_is_dynamic : 1; // True if name was dynamically allocated (needs delete[])
|
||||||
|
bool is_retry : 1; // True if this is a retry timeout
|
||||||
|
// 5 bits padding
|
||||||
|
#else
|
||||||
|
// Single-threaded or multi-threaded without atomics: can pack all fields together
|
||||||
|
// Bit-packed fields (4 bits used, 4 bits padding in 1 byte)
|
||||||
enum Type : uint8_t { TIMEOUT, INTERVAL } type : 1;
|
enum Type : uint8_t { TIMEOUT, INTERVAL } type : 1;
|
||||||
bool remove : 1;
|
bool remove : 1;
|
||||||
bool name_is_dynamic : 1; // True if name was dynamically allocated (needs delete[])
|
bool name_is_dynamic : 1; // True if name was dynamically allocated (needs delete[])
|
||||||
bool is_retry : 1; // True if this is a retry timeout
|
bool is_retry : 1; // True if this is a retry timeout
|
||||||
// 4 bits padding
|
// 4 bits padding
|
||||||
|
#endif
|
||||||
|
|
||||||
// Constructor
|
// Constructor
|
||||||
SchedulerItem()
|
SchedulerItem()
|
||||||
: component(nullptr),
|
: component(nullptr),
|
||||||
interval(0),
|
interval(0),
|
||||||
next_execution_(0),
|
next_execution_(0),
|
||||||
|
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
|
||||||
|
// remove is initialized in the member declaration as std::atomic<bool>{false}
|
||||||
|
type(TIMEOUT),
|
||||||
|
name_is_dynamic(false),
|
||||||
|
is_retry(false) {
|
||||||
|
#else
|
||||||
type(TIMEOUT),
|
type(TIMEOUT),
|
||||||
remove(false),
|
remove(false),
|
||||||
name_is_dynamic(false),
|
name_is_dynamic(false),
|
||||||
is_retry(false) {
|
is_retry(false) {
|
||||||
|
#endif
|
||||||
name_.static_name = nullptr;
|
name_.static_name = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -219,6 +239,37 @@ class Scheduler {
|
|||||||
return item->remove || (item->component != nullptr && item->component->is_failed());
|
return item->remove || (item->component != nullptr && item->component->is_failed());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Helper to check if item is marked for removal (platform-specific)
|
||||||
|
// Returns true if item should be skipped, handles platform-specific synchronization
|
||||||
|
// For ESPHOME_THREAD_MULTI_NO_ATOMICS platforms, the caller must hold the scheduler lock before calling this
|
||||||
|
// function.
|
||||||
|
bool is_item_removed_(SchedulerItem *item) const {
|
||||||
|
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
|
||||||
|
// Multi-threaded with atomics: use atomic load for lock-free access
|
||||||
|
return item->remove.load(std::memory_order_acquire);
|
||||||
|
#else
|
||||||
|
// Single-threaded (ESPHOME_THREAD_SINGLE) or
|
||||||
|
// multi-threaded without atomics (ESPHOME_THREAD_MULTI_NO_ATOMICS): direct read
|
||||||
|
// For ESPHOME_THREAD_MULTI_NO_ATOMICS, caller MUST hold lock!
|
||||||
|
return item->remove;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper to mark item for removal (platform-specific)
|
||||||
|
// For ESPHOME_THREAD_MULTI_NO_ATOMICS platforms, the caller must hold the scheduler lock before calling this
|
||||||
|
// function.
|
||||||
|
void mark_item_removed_(SchedulerItem *item) {
|
||||||
|
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
|
||||||
|
// Multi-threaded with atomics: use atomic store
|
||||||
|
item->remove.store(true, std::memory_order_release);
|
||||||
|
#else
|
||||||
|
// Single-threaded (ESPHOME_THREAD_SINGLE) or
|
||||||
|
// multi-threaded without atomics (ESPHOME_THREAD_MULTI_NO_ATOMICS): direct write
|
||||||
|
// For ESPHOME_THREAD_MULTI_NO_ATOMICS, caller MUST hold lock!
|
||||||
|
item->remove = true;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
// Template helper to check if any item in a container matches our criteria
|
// Template helper to check if any item in a container matches our criteria
|
||||||
template<typename Container>
|
template<typename Container>
|
||||||
bool has_cancelled_timeout_in_container_(const Container &container, Component *component, const char *name_cstr,
|
bool has_cancelled_timeout_in_container_(const Container &container, Component *component, const char *name_cstr,
|
||||||
|
139
tests/integration/fixtures/scheduler_removed_item_race.yaml
Normal file
139
tests/integration/fixtures/scheduler_removed_item_race.yaml
Normal file
@@ -0,0 +1,139 @@
|
|||||||
|
esphome:
|
||||||
|
name: scheduler-removed-item-race
|
||||||
|
|
||||||
|
host:
|
||||||
|
|
||||||
|
api:
|
||||||
|
services:
|
||||||
|
- service: run_test
|
||||||
|
then:
|
||||||
|
- script.execute: run_test_script
|
||||||
|
|
||||||
|
logger:
|
||||||
|
level: DEBUG
|
||||||
|
|
||||||
|
globals:
|
||||||
|
- id: test_passed
|
||||||
|
type: bool
|
||||||
|
initial_value: 'true'
|
||||||
|
- id: removed_item_executed
|
||||||
|
type: int
|
||||||
|
initial_value: '0'
|
||||||
|
- id: normal_item_executed
|
||||||
|
type: int
|
||||||
|
initial_value: '0'
|
||||||
|
|
||||||
|
sensor:
|
||||||
|
- platform: template
|
||||||
|
id: test_sensor
|
||||||
|
name: "Test Sensor"
|
||||||
|
update_interval: never
|
||||||
|
lambda: return 0.0;
|
||||||
|
|
||||||
|
script:
|
||||||
|
- id: run_test_script
|
||||||
|
then:
|
||||||
|
- logger.log: "=== Starting Removed Item Race Test ==="
|
||||||
|
|
||||||
|
# This test creates a scenario where:
|
||||||
|
# 1. First item in heap is NOT cancelled (cleanup stops immediately)
|
||||||
|
# 2. Items behind it ARE cancelled (remain in heap after cleanup)
|
||||||
|
# 3. All items execute at the same time, including cancelled ones
|
||||||
|
|
||||||
|
- lambda: |-
|
||||||
|
// The key to hitting the race:
|
||||||
|
// 1. Add items in a specific order to control heap structure
|
||||||
|
// 2. Cancel ONLY items that won't be at the front
|
||||||
|
// 3. Ensure the first item stays non-cancelled so cleanup_() stops immediately
|
||||||
|
|
||||||
|
// Schedule all items to execute at the SAME time (1ms from now)
|
||||||
|
// Using 1ms instead of 0 to avoid defer queue on multi-core platforms
|
||||||
|
// This ensures they'll all be ready together and go through the heap
|
||||||
|
const uint32_t exec_time = 1;
|
||||||
|
|
||||||
|
// CRITICAL: Add a non-cancellable item FIRST
|
||||||
|
// This will be at the front of the heap and block cleanup_()
|
||||||
|
App.scheduler.set_timeout(id(test_sensor), "blocker", exec_time, []() {
|
||||||
|
ESP_LOGD("test", "Blocker timeout executed (expected) - was at front of heap");
|
||||||
|
id(normal_item_executed)++;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Now add items that we WILL cancel
|
||||||
|
// These will be behind the blocker in the heap
|
||||||
|
App.scheduler.set_timeout(id(test_sensor), "cancel_1", exec_time, []() {
|
||||||
|
ESP_LOGE("test", "RACE: Cancelled timeout 1 executed after being cancelled!");
|
||||||
|
id(removed_item_executed)++;
|
||||||
|
id(test_passed) = false;
|
||||||
|
});
|
||||||
|
|
||||||
|
App.scheduler.set_timeout(id(test_sensor), "cancel_2", exec_time, []() {
|
||||||
|
ESP_LOGE("test", "RACE: Cancelled timeout 2 executed after being cancelled!");
|
||||||
|
id(removed_item_executed)++;
|
||||||
|
id(test_passed) = false;
|
||||||
|
});
|
||||||
|
|
||||||
|
App.scheduler.set_timeout(id(test_sensor), "cancel_3", exec_time, []() {
|
||||||
|
ESP_LOGE("test", "RACE: Cancelled timeout 3 executed after being cancelled!");
|
||||||
|
id(removed_item_executed)++;
|
||||||
|
id(test_passed) = false;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Add some more normal items
|
||||||
|
App.scheduler.set_timeout(id(test_sensor), "normal_1", exec_time, []() {
|
||||||
|
ESP_LOGD("test", "Normal timeout 1 executed (expected)");
|
||||||
|
id(normal_item_executed)++;
|
||||||
|
});
|
||||||
|
|
||||||
|
App.scheduler.set_timeout(id(test_sensor), "normal_2", exec_time, []() {
|
||||||
|
ESP_LOGD("test", "Normal timeout 2 executed (expected)");
|
||||||
|
id(normal_item_executed)++;
|
||||||
|
});
|
||||||
|
|
||||||
|
App.scheduler.set_timeout(id(test_sensor), "normal_3", exec_time, []() {
|
||||||
|
ESP_LOGD("test", "Normal timeout 3 executed (expected)");
|
||||||
|
id(normal_item_executed)++;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Force items into the heap before cancelling
|
||||||
|
App.scheduler.process_to_add();
|
||||||
|
|
||||||
|
// NOW cancel the items - they're behind "blocker" in the heap
|
||||||
|
// When cleanup_() runs, it will see "blocker" (not removed) at the front
|
||||||
|
// and stop immediately, leaving cancel_1, cancel_2, cancel_3 in the heap
|
||||||
|
bool c1 = App.scheduler.cancel_timeout(id(test_sensor), "cancel_1");
|
||||||
|
bool c2 = App.scheduler.cancel_timeout(id(test_sensor), "cancel_2");
|
||||||
|
bool c3 = App.scheduler.cancel_timeout(id(test_sensor), "cancel_3");
|
||||||
|
|
||||||
|
ESP_LOGD("test", "Cancelled items (behind blocker): %s, %s, %s",
|
||||||
|
c1 ? "true" : "false",
|
||||||
|
c2 ? "true" : "false",
|
||||||
|
c3 ? "true" : "false");
|
||||||
|
|
||||||
|
// The heap now has:
|
||||||
|
// - "blocker" at front (not cancelled)
|
||||||
|
// - cancelled items behind it (marked remove=true but still in heap)
|
||||||
|
// - When all execute at once, cleanup_() stops at "blocker"
|
||||||
|
// - The loop then executes ALL ready items including cancelled ones
|
||||||
|
|
||||||
|
ESP_LOGD("test", "Setup complete. Blocker at front prevents cleanup of cancelled items behind it");
|
||||||
|
|
||||||
|
# Wait for all timeouts to execute (or not)
|
||||||
|
- delay: 20ms
|
||||||
|
|
||||||
|
# Check results
|
||||||
|
- lambda: |-
|
||||||
|
ESP_LOGI("test", "=== Test Results ===");
|
||||||
|
ESP_LOGI("test", "Normal items executed: %d (expected 4)", id(normal_item_executed));
|
||||||
|
ESP_LOGI("test", "Removed items executed: %d (expected 0)", id(removed_item_executed));
|
||||||
|
|
||||||
|
if (id(removed_item_executed) > 0) {
|
||||||
|
ESP_LOGE("test", "TEST FAILED: %d cancelled items were executed!", id(removed_item_executed));
|
||||||
|
id(test_passed) = false;
|
||||||
|
} else if (id(normal_item_executed) != 4) {
|
||||||
|
ESP_LOGE("test", "TEST FAILED: Expected 4 normal items, got %d", id(normal_item_executed));
|
||||||
|
id(test_passed) = false;
|
||||||
|
} else {
|
||||||
|
ESP_LOGI("test", "TEST PASSED: No cancelled items were executed");
|
||||||
|
}
|
||||||
|
|
||||||
|
ESP_LOGI("test", "=== Test Complete ===");
|
102
tests/integration/test_scheduler_removed_item_race.py
Normal file
102
tests/integration/test_scheduler_removed_item_race.py
Normal file
@@ -0,0 +1,102 @@
|
|||||||
|
"""Test for scheduler race condition where removed items still execute."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import re
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from .types import APIClientConnectedFactory, RunCompiledFunction
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_scheduler_removed_item_race(
|
||||||
|
yaml_config: str,
|
||||||
|
run_compiled: RunCompiledFunction,
|
||||||
|
api_client_connected: APIClientConnectedFactory,
|
||||||
|
) -> None:
|
||||||
|
"""Test that items marked for removal don't execute.
|
||||||
|
|
||||||
|
This test verifies the fix for a race condition where:
|
||||||
|
1. cleanup_() only removes items from the front of the heap
|
||||||
|
2. Items in the middle of the heap marked for removal still execute
|
||||||
|
3. This causes cancelled timeouts to run when they shouldn't
|
||||||
|
"""
|
||||||
|
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
test_complete_future: asyncio.Future[bool] = loop.create_future()
|
||||||
|
|
||||||
|
# Track test results
|
||||||
|
test_passed = False
|
||||||
|
removed_executed = 0
|
||||||
|
normal_executed = 0
|
||||||
|
|
||||||
|
# Patterns to match
|
||||||
|
race_pattern = re.compile(r"RACE: .* executed after being cancelled!")
|
||||||
|
passed_pattern = re.compile(r"TEST PASSED")
|
||||||
|
failed_pattern = re.compile(r"TEST FAILED")
|
||||||
|
complete_pattern = re.compile(r"=== Test Complete ===")
|
||||||
|
normal_count_pattern = re.compile(r"Normal items executed: (\d+)")
|
||||||
|
removed_count_pattern = re.compile(r"Removed items executed: (\d+)")
|
||||||
|
|
||||||
|
def check_output(line: str) -> None:
|
||||||
|
"""Check log output for test results."""
|
||||||
|
nonlocal test_passed, removed_executed, normal_executed
|
||||||
|
|
||||||
|
if race_pattern.search(line):
|
||||||
|
# Race condition detected - a cancelled item executed
|
||||||
|
test_passed = False
|
||||||
|
|
||||||
|
if passed_pattern.search(line):
|
||||||
|
test_passed = True
|
||||||
|
elif failed_pattern.search(line):
|
||||||
|
test_passed = False
|
||||||
|
|
||||||
|
normal_match = normal_count_pattern.search(line)
|
||||||
|
if normal_match:
|
||||||
|
normal_executed = int(normal_match.group(1))
|
||||||
|
|
||||||
|
removed_match = removed_count_pattern.search(line)
|
||||||
|
if removed_match:
|
||||||
|
removed_executed = int(removed_match.group(1))
|
||||||
|
|
||||||
|
if not test_complete_future.done() and complete_pattern.search(line):
|
||||||
|
test_complete_future.set_result(True)
|
||||||
|
|
||||||
|
async with (
|
||||||
|
run_compiled(yaml_config, line_callback=check_output),
|
||||||
|
api_client_connected() as client,
|
||||||
|
):
|
||||||
|
# Verify we can connect
|
||||||
|
device_info = await client.device_info()
|
||||||
|
assert device_info is not None
|
||||||
|
assert device_info.name == "scheduler-removed-item-race"
|
||||||
|
|
||||||
|
# List services
|
||||||
|
_, services = await asyncio.wait_for(
|
||||||
|
client.list_entities_services(), timeout=5.0
|
||||||
|
)
|
||||||
|
|
||||||
|
# Find run_test service
|
||||||
|
run_test_service = next((s for s in services if s.name == "run_test"), None)
|
||||||
|
assert run_test_service is not None, "run_test service not found"
|
||||||
|
|
||||||
|
# Execute the test
|
||||||
|
client.execute_service(run_test_service, {})
|
||||||
|
|
||||||
|
# Wait for test completion
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(test_complete_future, timeout=5.0)
|
||||||
|
except TimeoutError:
|
||||||
|
pytest.fail("Test did not complete within timeout")
|
||||||
|
|
||||||
|
# Verify results
|
||||||
|
assert test_passed, (
|
||||||
|
f"Test failed! Removed items executed: {removed_executed}, "
|
||||||
|
f"Normal items executed: {normal_executed}"
|
||||||
|
)
|
||||||
|
assert removed_executed == 0, (
|
||||||
|
f"Cancelled items should not execute, but {removed_executed} did"
|
||||||
|
)
|
||||||
|
assert normal_executed == 4, (
|
||||||
|
f"Expected 4 normal items to execute, got {normal_executed}"
|
||||||
|
)
|
Reference in New Issue
Block a user