1
0
mirror of https://github.com/esphome/esphome.git synced 2025-09-25 06:32:22 +01:00

Fix scheduler race conditions and add comprehensive test suite (#9348)

This commit is contained in:
J. Nick Koston
2025-07-07 14:57:55 -05:00
committed by GitHub
parent 138ff749f3
commit 3ef392d433
45 changed files with 2686 additions and 102 deletions

View File

@@ -62,16 +62,16 @@ static void validate_static_string(const char *name) {
void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type type, bool is_static_string,
const void *name_ptr, uint32_t delay, std::function<void()> func) {
// Get the name as const char*
const char *name_cstr =
is_static_string ? static_cast<const char *>(name_ptr) : static_cast<const std::string *>(name_ptr)->c_str();
const char *name_cstr = this->get_name_cstr_(is_static_string, name_ptr);
// Cancel existing timer if name is not empty
if (name_cstr != nullptr && name_cstr[0] != '\0') {
this->cancel_item_(component, name_cstr, type);
}
if (delay == SCHEDULER_DONT_RUN)
if (delay == SCHEDULER_DONT_RUN) {
// Still need to cancel existing timer if name is not empty
if (this->is_name_valid_(name_cstr)) {
LockGuard guard{this->lock_};
this->cancel_item_locked_(component, name_cstr, type);
}
return;
}
// Create and populate the scheduler item
auto item = make_unique<SchedulerItem>();
@@ -87,6 +87,7 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type
if (delay == 0 && type == SchedulerItem::TIMEOUT) {
// Put in defer queue for guaranteed FIFO execution
LockGuard guard{this->lock_};
this->cancel_item_locked_(component, name_cstr, type);
this->defer_queue_.push_back(std::move(item));
return;
}
@@ -122,7 +123,15 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type
}
#endif
this->push_(std::move(item));
LockGuard guard{this->lock_};
// If name is provided, do atomic cancel-and-add
if (this->is_name_valid_(name_cstr)) {
// Cancel existing items
this->cancel_item_locked_(component, name_cstr, type);
}
// Add new item directly to to_add_
// since we have the lock held
this->to_add_.push_back(std::move(item));
}
void HOT Scheduler::set_timeout(Component *component, const char *name, uint32_t timeout, std::function<void()> func) {
@@ -134,10 +143,10 @@ void HOT Scheduler::set_timeout(Component *component, const std::string &name, u
this->set_timer_common_(component, SchedulerItem::TIMEOUT, false, &name, timeout, std::move(func));
}
bool HOT Scheduler::cancel_timeout(Component *component, const std::string &name) {
return this->cancel_item_(component, name, SchedulerItem::TIMEOUT);
return this->cancel_item_(component, false, &name, SchedulerItem::TIMEOUT);
}
bool HOT Scheduler::cancel_timeout(Component *component, const char *name) {
return this->cancel_item_(component, name, SchedulerItem::TIMEOUT);
return this->cancel_item_(component, true, name, SchedulerItem::TIMEOUT);
}
void HOT Scheduler::set_interval(Component *component, const std::string &name, uint32_t interval,
std::function<void()> func) {
@@ -149,10 +158,10 @@ void HOT Scheduler::set_interval(Component *component, const char *name, uint32_
this->set_timer_common_(component, SchedulerItem::INTERVAL, true, name, interval, std::move(func));
}
bool HOT Scheduler::cancel_interval(Component *component, const std::string &name) {
return this->cancel_item_(component, name, SchedulerItem::INTERVAL);
return this->cancel_item_(component, false, &name, SchedulerItem::INTERVAL);
}
bool HOT Scheduler::cancel_interval(Component *component, const char *name) {
return this->cancel_item_(component, name, SchedulerItem::INTERVAL);
return this->cancel_item_(component, true, name, SchedulerItem::INTERVAL);
}
struct RetryArgs {
@@ -211,6 +220,9 @@ bool HOT Scheduler::cancel_retry(Component *component, const std::string &name)
}
optional<uint32_t> HOT Scheduler::next_schedule_in() {
// IMPORTANT: This method should only be called from the main thread (loop task).
// It calls empty_() and accesses items_[0] without holding a lock, which is only
// safe when called from the main thread. Other threads must not call this method.
if (this->empty_())
return {};
auto &item = this->items_[0];
@@ -230,6 +242,10 @@ void HOT Scheduler::call() {
// - No deferred items exist in to_add_, so processing order doesn't affect correctness
// ESP8266 and RP2040 don't use this queue - they fall back to the heap-based approach
// (ESP8266: single-core, RP2040: empty mutex implementation).
//
// Note: Items cancelled via cancel_item_locked_() are marked with remove=true but still
// processed here. They are removed from the queue normally via pop_front() but skipped
// during execution by should_skip_item_(). This is intentional - no memory leak occurs.
while (!this->defer_queue_.empty()) {
// The outer check is done without a lock for performance. If the queue
// appears non-empty, we lock and process an item. We don't need to check
@@ -261,10 +277,12 @@ void HOT Scheduler::call() {
ESP_LOGD(TAG, "Items: count=%zu, now=%" PRIu64 " (%u, %" PRIu32 ")", this->items_.size(), now, this->millis_major_,
this->last_millis_);
while (!this->empty_()) {
this->lock_.lock();
auto item = std::move(this->items_[0]);
this->pop_raw_();
this->lock_.unlock();
std::unique_ptr<SchedulerItem> item;
{
LockGuard guard{this->lock_};
item = std::move(this->items_[0]);
this->pop_raw_();
}
const char *name = item->get_name();
ESP_LOGD(TAG, " %s '%s/%s' interval=%" PRIu32 " next_execution in %" PRIu64 "ms at %" PRIu64,
@@ -278,33 +296,35 @@ void HOT Scheduler::call() {
{
LockGuard guard{this->lock_};
this->items_ = std::move(old_items);
// Rebuild heap after moving items back
std::make_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
}
}
#endif // ESPHOME_DEBUG_SCHEDULER
auto to_remove_was = to_remove_;
auto items_was = this->items_.size();
// If we have too many items to remove
if (to_remove_ > MAX_LOGICALLY_DELETED_ITEMS) {
if (this->to_remove_ > MAX_LOGICALLY_DELETED_ITEMS) {
// We hold the lock for the entire cleanup operation because:
// 1. We're rebuilding the entire items_ list, so we need exclusive access throughout
// 2. Other threads must see either the old state or the new state, not intermediate states
// 3. The operation is already expensive (O(n)), so lock overhead is negligible
// 4. No operations inside can block or take other locks, so no deadlock risk
LockGuard guard{this->lock_};
std::vector<std::unique_ptr<SchedulerItem>> valid_items;
while (!this->empty_()) {
LockGuard guard{this->lock_};
auto item = std::move(this->items_[0]);
this->pop_raw_();
valid_items.push_back(std::move(item));
// Move all non-removed items to valid_items
for (auto &item : this->items_) {
if (!item->remove) {
valid_items.push_back(std::move(item));
}
}
{
LockGuard guard{this->lock_};
this->items_ = std::move(valid_items);
}
// The following should not happen unless I'm missing something
if (to_remove_ != 0) {
ESP_LOGW(TAG, "to_remove_ was %" PRIu32 " now: %" PRIu32 " items where %zu now %zu. Please report this",
to_remove_was, to_remove_, items_was, items_.size());
to_remove_ = 0;
}
// Replace items_ with the filtered list
this->items_ = std::move(valid_items);
// Rebuild the heap structure since items are no longer in heap order
std::make_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
this->to_remove_ = 0;
}
while (!this->empty_()) {
@@ -336,26 +356,25 @@ void HOT Scheduler::call() {
}
{
this->lock_.lock();
LockGuard guard{this->lock_};
// new scope, item from before might have been moved in the vector
auto item = std::move(this->items_[0]);
// Only pop after function call, this ensures we were reachable
// during the function call and know if we were cancelled.
this->pop_raw_();
this->lock_.unlock();
if (item->remove) {
// We were removed/cancelled in the function call, stop
to_remove_--;
this->to_remove_--;
continue;
}
if (item->type == SchedulerItem::INTERVAL) {
item->next_execution_ = now + item->interval;
this->push_(std::move(item));
// Add new item directly to to_add_
// since we have the lock held
this->to_add_.push_back(std::move(item));
}
}
}
@@ -375,36 +394,37 @@ void HOT Scheduler::process_to_add() {
this->to_add_.clear();
}
void HOT Scheduler::cleanup_() {
// Fast path: if nothing to remove, just return
// Reading to_remove_ without lock is safe because:
// 1. We only call this from the main thread during call()
// 2. If it's 0, there's definitely nothing to cleanup
// 3. If it becomes non-zero after we check, cleanup will happen on the next loop iteration
// 4. Not all platforms support atomics, so we accept this race in favor of performance
// 5. The worst case is a one-loop-iteration delay in cleanup, which is harmless
if (this->to_remove_ == 0)
return;
// We must hold the lock for the entire cleanup operation because:
// 1. We're modifying items_ (via pop_raw_) which requires exclusive access
// 2. We're decrementing to_remove_ which is also modified by other threads
// (though all modifications are already under lock)
// 3. Other threads read items_ when searching for items to cancel in cancel_item_locked_()
// 4. We need a consistent view of items_ and to_remove_ throughout the operation
// Without the lock, we could access items_ while another thread is reading it,
// leading to race conditions
LockGuard guard{this->lock_};
while (!this->items_.empty()) {
auto &item = this->items_[0];
if (!item->remove)
return;
to_remove_--;
{
LockGuard guard{this->lock_};
this->pop_raw_();
}
this->to_remove_--;
this->pop_raw_();
}
}
void HOT Scheduler::pop_raw_() {
std::pop_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
this->items_.pop_back();
}
void HOT Scheduler::push_(std::unique_ptr<Scheduler::SchedulerItem> item) {
LockGuard guard{this->lock_};
this->to_add_.push_back(std::move(item));
}
// Helper function to check if item matches criteria for cancellation
bool HOT Scheduler::matches_item_(const std::unique_ptr<SchedulerItem> &item, Component *component,
const char *name_cstr, SchedulerItem::Type type) {
if (item->component != component || item->type != type || item->remove) {
return false;
}
const char *item_name = item->get_name();
return item_name != nullptr && strcmp(name_cstr, item_name) == 0;
}
// Helper to execute a scheduler item
void HOT Scheduler::execute_item_(SchedulerItem *item) {
@@ -417,55 +437,56 @@ void HOT Scheduler::execute_item_(SchedulerItem *item) {
}
// Common implementation for cancel operations
bool HOT Scheduler::cancel_item_common_(Component *component, bool is_static_string, const void *name_ptr,
SchedulerItem::Type type) {
bool HOT Scheduler::cancel_item_(Component *component, bool is_static_string, const void *name_ptr,
SchedulerItem::Type type) {
// Get the name as const char*
const char *name_cstr =
is_static_string ? static_cast<const char *>(name_ptr) : static_cast<const std::string *>(name_ptr)->c_str();
const char *name_cstr = this->get_name_cstr_(is_static_string, name_ptr);
// Handle null or empty names
if (name_cstr == nullptr)
if (!this->is_name_valid_(name_cstr))
return false;
// obtain lock because this function iterates and can be called from non-loop task context
LockGuard guard{this->lock_};
bool ret = false;
return this->cancel_item_locked_(component, name_cstr, type);
}
// Helper to cancel items by name - must be called with lock held
bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_cstr, SchedulerItem::Type type) {
size_t total_cancelled = 0;
// Check all containers for matching items
#if !defined(USE_ESP8266) && !defined(USE_RP2040)
// Only check defer_queue_ on platforms that have it
for (auto &item : this->defer_queue_) {
if (this->matches_item_(item, component, name_cstr, type)) {
item->remove = true;
ret = true;
// Only check defer queue for timeouts (intervals never go there)
if (type == SchedulerItem::TIMEOUT) {
for (auto &item : this->defer_queue_) {
if (this->matches_item_(item, component, name_cstr, type)) {
item->remove = true;
total_cancelled++;
}
}
}
#endif
// Cancel items in the main heap
for (auto &item : this->items_) {
if (this->matches_item_(item, component, name_cstr, type)) {
item->remove = true;
ret = true;
this->to_remove_++; // Only track removals for heap items
total_cancelled++;
this->to_remove_++; // Track removals for heap items
}
}
// Cancel items in to_add_
for (auto &item : this->to_add_) {
if (this->matches_item_(item, component, name_cstr, type)) {
item->remove = true;
ret = true;
total_cancelled++;
// Don't track removals for to_add_ items
}
}
return ret;
}
bool HOT Scheduler::cancel_item_(Component *component, const std::string &name, Scheduler::SchedulerItem::Type type) {
return this->cancel_item_common_(component, false, &name, type);
}
bool HOT Scheduler::cancel_item_(Component *component, const char *name, SchedulerItem::Type type) {
return this->cancel_item_common_(component, true, name, type);
return total_cancelled > 0;
}
uint64_t Scheduler::millis_() {