mirror of
https://github.com/esphome/esphome.git
synced 2025-09-12 08:12:22 +01:00
[core] Add memory pool to scheduler to reduce heap fragmentation (#10536)
This commit is contained in:
@@ -14,7 +14,20 @@ namespace esphome {
|
|||||||
|
|
||||||
static const char *const TAG = "scheduler";
|
static const char *const TAG = "scheduler";
|
||||||
|
|
||||||
static const uint32_t MAX_LOGICALLY_DELETED_ITEMS = 10;
|
// Memory pool configuration constants
|
||||||
|
// Pool size of 5 matches typical usage patterns (2-4 active timers)
|
||||||
|
// - Minimal memory overhead (~250 bytes on ESP32)
|
||||||
|
// - Sufficient for most configs with a couple sensors/components
|
||||||
|
// - Still prevents heap fragmentation and allocation stalls
|
||||||
|
// - Complex setups with many timers will just allocate beyond the pool
|
||||||
|
// See https://github.com/esphome/backlog/issues/52
|
||||||
|
static constexpr size_t MAX_POOL_SIZE = 5;
|
||||||
|
|
||||||
|
// Maximum number of logically deleted (cancelled) items before forcing cleanup.
|
||||||
|
// Set to 5 to match the pool size - when we have as many cancelled items as our
|
||||||
|
// pool can hold, it's time to clean up and recycle them.
|
||||||
|
static constexpr uint32_t MAX_LOGICALLY_DELETED_ITEMS = 5;
|
||||||
|
|
||||||
// Half the 32-bit range - used to detect rollovers vs normal time progression
|
// Half the 32-bit range - used to detect rollovers vs normal time progression
|
||||||
static constexpr uint32_t HALF_MAX_UINT32 = std::numeric_limits<uint32_t>::max() / 2;
|
static constexpr uint32_t HALF_MAX_UINT32 = std::numeric_limits<uint32_t>::max() / 2;
|
||||||
// max delay to start an interval sequence
|
// max delay to start an interval sequence
|
||||||
@@ -79,8 +92,28 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get fresh timestamp BEFORE taking lock - millis_64_ may need to acquire lock itself
|
||||||
|
const uint64_t now = this->millis_64_(millis());
|
||||||
|
|
||||||
|
// Take lock early to protect scheduler_item_pool_ access
|
||||||
|
LockGuard guard{this->lock_};
|
||||||
|
|
||||||
// Create and populate the scheduler item
|
// Create and populate the scheduler item
|
||||||
auto item = make_unique<SchedulerItem>();
|
std::unique_ptr<SchedulerItem> item;
|
||||||
|
if (!this->scheduler_item_pool_.empty()) {
|
||||||
|
// Reuse from pool
|
||||||
|
item = std::move(this->scheduler_item_pool_.back());
|
||||||
|
this->scheduler_item_pool_.pop_back();
|
||||||
|
#ifdef ESPHOME_DEBUG_SCHEDULER
|
||||||
|
ESP_LOGD(TAG, "Reused item from pool (pool size now: %zu)", this->scheduler_item_pool_.size());
|
||||||
|
#endif
|
||||||
|
} else {
|
||||||
|
// Allocate new if pool is empty
|
||||||
|
item = make_unique<SchedulerItem>();
|
||||||
|
#ifdef ESPHOME_DEBUG_SCHEDULER
|
||||||
|
ESP_LOGD(TAG, "Allocated new item (pool empty)");
|
||||||
|
#endif
|
||||||
|
}
|
||||||
item->component = component;
|
item->component = component;
|
||||||
item->set_name(name_cstr, !is_static_string);
|
item->set_name(name_cstr, !is_static_string);
|
||||||
item->type = type;
|
item->type = type;
|
||||||
@@ -99,7 +132,6 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type
|
|||||||
// Single-core platforms don't need thread-safe defer handling
|
// Single-core platforms don't need thread-safe defer handling
|
||||||
if (delay == 0 && type == SchedulerItem::TIMEOUT) {
|
if (delay == 0 && type == SchedulerItem::TIMEOUT) {
|
||||||
// Put in defer queue for guaranteed FIFO execution
|
// Put in defer queue for guaranteed FIFO execution
|
||||||
LockGuard guard{this->lock_};
|
|
||||||
if (!skip_cancel) {
|
if (!skip_cancel) {
|
||||||
this->cancel_item_locked_(component, name_cstr, type);
|
this->cancel_item_locked_(component, name_cstr, type);
|
||||||
}
|
}
|
||||||
@@ -108,9 +140,6 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type
|
|||||||
}
|
}
|
||||||
#endif /* not ESPHOME_THREAD_SINGLE */
|
#endif /* not ESPHOME_THREAD_SINGLE */
|
||||||
|
|
||||||
// Get fresh timestamp for new timer/interval - ensures accurate scheduling
|
|
||||||
const auto now = this->millis_64_(millis()); // Fresh millis() call
|
|
||||||
|
|
||||||
// Type-specific setup
|
// Type-specific setup
|
||||||
if (type == SchedulerItem::INTERVAL) {
|
if (type == SchedulerItem::INTERVAL) {
|
||||||
item->interval = delay;
|
item->interval = delay;
|
||||||
@@ -142,8 +171,6 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type
|
|||||||
}
|
}
|
||||||
#endif /* ESPHOME_DEBUG_SCHEDULER */
|
#endif /* ESPHOME_DEBUG_SCHEDULER */
|
||||||
|
|
||||||
LockGuard guard{this->lock_};
|
|
||||||
|
|
||||||
// For retries, check if there's a cancelled timeout first
|
// For retries, check if there's a cancelled timeout first
|
||||||
if (is_retry && name_cstr != nullptr && type == SchedulerItem::TIMEOUT &&
|
if (is_retry && name_cstr != nullptr && type == SchedulerItem::TIMEOUT &&
|
||||||
(has_cancelled_timeout_in_container_(this->items_, component, name_cstr, /* match_retry= */ true) ||
|
(has_cancelled_timeout_in_container_(this->items_, component, name_cstr, /* match_retry= */ true) ||
|
||||||
@@ -319,6 +346,8 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
if (!this->should_skip_item_(item.get())) {
|
if (!this->should_skip_item_(item.get())) {
|
||||||
this->execute_item_(item.get(), now);
|
this->execute_item_(item.get(), now);
|
||||||
}
|
}
|
||||||
|
// Recycle the defer item after execution
|
||||||
|
this->recycle_item_(std::move(item));
|
||||||
}
|
}
|
||||||
#endif /* not ESPHOME_THREAD_SINGLE */
|
#endif /* not ESPHOME_THREAD_SINGLE */
|
||||||
|
|
||||||
@@ -338,11 +367,11 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
|
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
|
||||||
const auto last_dbg = this->last_millis_.load(std::memory_order_relaxed);
|
const auto last_dbg = this->last_millis_.load(std::memory_order_relaxed);
|
||||||
const auto major_dbg = this->millis_major_.load(std::memory_order_relaxed);
|
const auto major_dbg = this->millis_major_.load(std::memory_order_relaxed);
|
||||||
ESP_LOGD(TAG, "Items: count=%zu, now=%" PRIu64 " (%" PRIu16 ", %" PRIu32 ")", this->items_.size(), now_64,
|
ESP_LOGD(TAG, "Items: count=%zu, pool=%zu, now=%" PRIu64 " (%" PRIu16 ", %" PRIu32 ")", this->items_.size(),
|
||||||
major_dbg, last_dbg);
|
this->scheduler_item_pool_.size(), now_64, major_dbg, last_dbg);
|
||||||
#else /* not ESPHOME_THREAD_MULTI_ATOMICS */
|
#else /* not ESPHOME_THREAD_MULTI_ATOMICS */
|
||||||
ESP_LOGD(TAG, "Items: count=%zu, now=%" PRIu64 " (%" PRIu16 ", %" PRIu32 ")", this->items_.size(), now_64,
|
ESP_LOGD(TAG, "Items: count=%zu, pool=%zu, now=%" PRIu64 " (%" PRIu16 ", %" PRIu32 ")", this->items_.size(),
|
||||||
this->millis_major_, this->last_millis_);
|
this->scheduler_item_pool_.size(), now_64, this->millis_major_, this->last_millis_);
|
||||||
#endif /* else ESPHOME_THREAD_MULTI_ATOMICS */
|
#endif /* else ESPHOME_THREAD_MULTI_ATOMICS */
|
||||||
// Cleanup before debug output
|
// Cleanup before debug output
|
||||||
this->cleanup_();
|
this->cleanup_();
|
||||||
@@ -355,9 +384,10 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const char *name = item->get_name();
|
const char *name = item->get_name();
|
||||||
ESP_LOGD(TAG, " %s '%s/%s' interval=%" PRIu32 " next_execution in %" PRIu64 "ms at %" PRIu64,
|
bool is_cancelled = is_item_removed_(item.get());
|
||||||
|
ESP_LOGD(TAG, " %s '%s/%s' interval=%" PRIu32 " next_execution in %" PRIu64 "ms at %" PRIu64 "%s",
|
||||||
item->get_type_str(), item->get_source(), name ? name : "(null)", item->interval,
|
item->get_type_str(), item->get_source(), name ? name : "(null)", item->interval,
|
||||||
item->next_execution_ - now_64, item->next_execution_);
|
item->next_execution_ - now_64, item->next_execution_, is_cancelled ? " [CANCELLED]" : "");
|
||||||
|
|
||||||
old_items.push_back(std::move(item));
|
old_items.push_back(std::move(item));
|
||||||
}
|
}
|
||||||
@@ -372,8 +402,13 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
}
|
}
|
||||||
#endif /* ESPHOME_DEBUG_SCHEDULER */
|
#endif /* ESPHOME_DEBUG_SCHEDULER */
|
||||||
|
|
||||||
// If we have too many items to remove
|
// Cleanup removed items before processing
|
||||||
if (this->to_remove_ > MAX_LOGICALLY_DELETED_ITEMS) {
|
// First try to clean items from the top of the heap (fast path)
|
||||||
|
this->cleanup_();
|
||||||
|
|
||||||
|
// If we still have too many cancelled items, do a full cleanup
|
||||||
|
// This only happens if cancelled items are stuck in the middle/bottom of the heap
|
||||||
|
if (this->to_remove_ >= MAX_LOGICALLY_DELETED_ITEMS) {
|
||||||
// We hold the lock for the entire cleanup operation because:
|
// We hold the lock for the entire cleanup operation because:
|
||||||
// 1. We're rebuilding the entire items_ list, so we need exclusive access throughout
|
// 1. We're rebuilding the entire items_ list, so we need exclusive access throughout
|
||||||
// 2. Other threads must see either the old state or the new state, not intermediate states
|
// 2. Other threads must see either the old state or the new state, not intermediate states
|
||||||
@@ -383,10 +418,13 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
|
|
||||||
std::vector<std::unique_ptr<SchedulerItem>> valid_items;
|
std::vector<std::unique_ptr<SchedulerItem>> valid_items;
|
||||||
|
|
||||||
// Move all non-removed items to valid_items
|
// Move all non-removed items to valid_items, recycle removed ones
|
||||||
for (auto &item : this->items_) {
|
for (auto &item : this->items_) {
|
||||||
if (!item->remove) {
|
if (!is_item_removed_(item.get())) {
|
||||||
valid_items.push_back(std::move(item));
|
valid_items.push_back(std::move(item));
|
||||||
|
} else {
|
||||||
|
// Recycle removed items
|
||||||
|
this->recycle_item_(std::move(item));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -396,9 +434,6 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
std::make_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
|
std::make_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
|
||||||
this->to_remove_ = 0;
|
this->to_remove_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cleanup removed items before processing
|
|
||||||
this->cleanup_();
|
|
||||||
while (!this->items_.empty()) {
|
while (!this->items_.empty()) {
|
||||||
// use scoping to indicate visibility of `item` variable
|
// use scoping to indicate visibility of `item` variable
|
||||||
{
|
{
|
||||||
@@ -472,6 +507,9 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
// Add new item directly to to_add_
|
// Add new item directly to to_add_
|
||||||
// since we have the lock held
|
// since we have the lock held
|
||||||
this->to_add_.push_back(std::move(item));
|
this->to_add_.push_back(std::move(item));
|
||||||
|
} else {
|
||||||
|
// Timeout completed - recycle it
|
||||||
|
this->recycle_item_(std::move(item));
|
||||||
}
|
}
|
||||||
|
|
||||||
has_added_items |= !this->to_add_.empty();
|
has_added_items |= !this->to_add_.empty();
|
||||||
@@ -485,7 +523,9 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
void HOT Scheduler::process_to_add() {
|
void HOT Scheduler::process_to_add() {
|
||||||
LockGuard guard{this->lock_};
|
LockGuard guard{this->lock_};
|
||||||
for (auto &it : this->to_add_) {
|
for (auto &it : this->to_add_) {
|
||||||
if (it->remove) {
|
if (is_item_removed_(it.get())) {
|
||||||
|
// Recycle cancelled items
|
||||||
|
this->recycle_item_(std::move(it));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -525,6 +565,10 @@ size_t HOT Scheduler::cleanup_() {
|
|||||||
}
|
}
|
||||||
void HOT Scheduler::pop_raw_() {
|
void HOT Scheduler::pop_raw_() {
|
||||||
std::pop_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
|
std::pop_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
|
||||||
|
|
||||||
|
// Instead of destroying, recycle the item
|
||||||
|
this->recycle_item_(std::move(this->items_.back()));
|
||||||
|
|
||||||
this->items_.pop_back();
|
this->items_.pop_back();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -559,7 +603,7 @@ bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_c
|
|||||||
|
|
||||||
// Check all containers for matching items
|
// Check all containers for matching items
|
||||||
#ifndef ESPHOME_THREAD_SINGLE
|
#ifndef ESPHOME_THREAD_SINGLE
|
||||||
// Only check defer queue for timeouts (intervals never go there)
|
// Mark items in defer queue as cancelled (they'll be skipped when processed)
|
||||||
if (type == SchedulerItem::TIMEOUT) {
|
if (type == SchedulerItem::TIMEOUT) {
|
||||||
for (auto &item : this->defer_queue_) {
|
for (auto &item : this->defer_queue_) {
|
||||||
if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
|
if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
|
||||||
@@ -571,11 +615,22 @@ bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_c
|
|||||||
#endif /* not ESPHOME_THREAD_SINGLE */
|
#endif /* not ESPHOME_THREAD_SINGLE */
|
||||||
|
|
||||||
// Cancel items in the main heap
|
// Cancel items in the main heap
|
||||||
for (auto &item : this->items_) {
|
// Special case: if the last item in the heap matches, we can remove it immediately
|
||||||
if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
|
// (removing the last element doesn't break heap structure)
|
||||||
this->mark_item_removed_(item.get());
|
if (!this->items_.empty()) {
|
||||||
|
auto &last_item = this->items_.back();
|
||||||
|
if (this->matches_item_(last_item, component, name_cstr, type, match_retry)) {
|
||||||
|
this->recycle_item_(std::move(this->items_.back()));
|
||||||
|
this->items_.pop_back();
|
||||||
total_cancelled++;
|
total_cancelled++;
|
||||||
this->to_remove_++; // Track removals for heap items
|
}
|
||||||
|
// For other items in heap, we can only mark for removal (can't remove from middle of heap)
|
||||||
|
for (auto &item : this->items_) {
|
||||||
|
if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
|
||||||
|
this->mark_item_removed_(item.get());
|
||||||
|
total_cancelled++;
|
||||||
|
this->to_remove_++; // Track removals for heap items
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -754,4 +809,25 @@ bool HOT Scheduler::SchedulerItem::cmp(const std::unique_ptr<SchedulerItem> &a,
|
|||||||
return a->next_execution_ > b->next_execution_;
|
return a->next_execution_ > b->next_execution_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Scheduler::recycle_item_(std::unique_ptr<SchedulerItem> item) {
|
||||||
|
if (!item)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (this->scheduler_item_pool_.size() < MAX_POOL_SIZE) {
|
||||||
|
// Clear callback to release captured resources
|
||||||
|
item->callback = nullptr;
|
||||||
|
// Clear dynamic name if any
|
||||||
|
item->clear_dynamic_name();
|
||||||
|
this->scheduler_item_pool_.push_back(std::move(item));
|
||||||
|
#ifdef ESPHOME_DEBUG_SCHEDULER
|
||||||
|
ESP_LOGD(TAG, "Recycled item to pool (pool size now: %zu)", this->scheduler_item_pool_.size());
|
||||||
|
#endif
|
||||||
|
} else {
|
||||||
|
#ifdef ESPHOME_DEBUG_SCHEDULER
|
||||||
|
ESP_LOGD(TAG, "Pool full (size: %zu), deleting item", this->scheduler_item_pool_.size());
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
// else: unique_ptr will delete the item when it goes out of scope
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace esphome
|
} // namespace esphome
|
||||||
|
@@ -142,11 +142,7 @@ class Scheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Destructor to clean up dynamic names
|
// Destructor to clean up dynamic names
|
||||||
~SchedulerItem() {
|
~SchedulerItem() { clear_dynamic_name(); }
|
||||||
if (name_is_dynamic) {
|
|
||||||
delete[] name_.dynamic_name;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete copy operations to prevent accidental copies
|
// Delete copy operations to prevent accidental copies
|
||||||
SchedulerItem(const SchedulerItem &) = delete;
|
SchedulerItem(const SchedulerItem &) = delete;
|
||||||
@@ -159,13 +155,19 @@ class Scheduler {
|
|||||||
// Helper to get the name regardless of storage type
|
// Helper to get the name regardless of storage type
|
||||||
const char *get_name() const { return name_is_dynamic ? name_.dynamic_name : name_.static_name; }
|
const char *get_name() const { return name_is_dynamic ? name_.dynamic_name : name_.static_name; }
|
||||||
|
|
||||||
|
// Helper to clear dynamic name if allocated
|
||||||
|
void clear_dynamic_name() {
|
||||||
|
if (name_is_dynamic && name_.dynamic_name) {
|
||||||
|
delete[] name_.dynamic_name;
|
||||||
|
name_.dynamic_name = nullptr;
|
||||||
|
name_is_dynamic = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Helper to set name with proper ownership
|
// Helper to set name with proper ownership
|
||||||
void set_name(const char *name, bool make_copy = false) {
|
void set_name(const char *name, bool make_copy = false) {
|
||||||
// Clean up old dynamic name if any
|
// Clean up old dynamic name if any
|
||||||
if (name_is_dynamic && name_.dynamic_name) {
|
clear_dynamic_name();
|
||||||
delete[] name_.dynamic_name;
|
|
||||||
name_is_dynamic = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!name) {
|
if (!name) {
|
||||||
// nullptr case - no name provided
|
// nullptr case - no name provided
|
||||||
@@ -214,6 +216,15 @@ class Scheduler {
|
|||||||
// Common implementation for cancel operations
|
// Common implementation for cancel operations
|
||||||
bool cancel_item_(Component *component, bool is_static_string, const void *name_ptr, SchedulerItem::Type type);
|
bool cancel_item_(Component *component, bool is_static_string, const void *name_ptr, SchedulerItem::Type type);
|
||||||
|
|
||||||
|
// Helper to check if two scheduler item names match
|
||||||
|
inline bool HOT names_match_(const char *name1, const char *name2) const {
|
||||||
|
// Check pointer equality first (common for static strings), then string contents
|
||||||
|
// The core ESPHome codebase uses static strings (const char*) for component names,
|
||||||
|
// making pointer comparison effective. The std::string overloads exist only for
|
||||||
|
// compatibility with external components but are rarely used in practice.
|
||||||
|
return (name1 != nullptr && name2 != nullptr) && ((name1 == name2) || (strcmp(name1, name2) == 0));
|
||||||
|
}
|
||||||
|
|
||||||
// Helper function to check if item matches criteria for cancellation
|
// Helper function to check if item matches criteria for cancellation
|
||||||
inline bool HOT matches_item_(const std::unique_ptr<SchedulerItem> &item, Component *component, const char *name_cstr,
|
inline bool HOT matches_item_(const std::unique_ptr<SchedulerItem> &item, Component *component, const char *name_cstr,
|
||||||
SchedulerItem::Type type, bool match_retry, bool skip_removed = true) const {
|
SchedulerItem::Type type, bool match_retry, bool skip_removed = true) const {
|
||||||
@@ -221,29 +232,20 @@ class Scheduler {
|
|||||||
(match_retry && !item->is_retry)) {
|
(match_retry && !item->is_retry)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
const char *item_name = item->get_name();
|
return this->names_match_(item->get_name(), name_cstr);
|
||||||
if (item_name == nullptr) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
// Fast path: if pointers are equal
|
|
||||||
// This is effective because the core ESPHome codebase uses static strings (const char*)
|
|
||||||
// for component names. The std::string overloads exist only for compatibility with
|
|
||||||
// external components, but are rarely used in practice.
|
|
||||||
if (item_name == name_cstr) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
// Slow path: compare string contents
|
|
||||||
return strcmp(name_cstr, item_name) == 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Helper to execute a scheduler item
|
// Helper to execute a scheduler item
|
||||||
void execute_item_(SchedulerItem *item, uint32_t now);
|
void execute_item_(SchedulerItem *item, uint32_t now);
|
||||||
|
|
||||||
// Helper to check if item should be skipped
|
// Helper to check if item should be skipped
|
||||||
bool should_skip_item_(const SchedulerItem *item) const {
|
bool should_skip_item_(SchedulerItem *item) const {
|
||||||
return item->remove || (item->component != nullptr && item->component->is_failed());
|
return is_item_removed_(item) || (item->component != nullptr && item->component->is_failed());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Helper to recycle a SchedulerItem
|
||||||
|
void recycle_item_(std::unique_ptr<SchedulerItem> item);
|
||||||
|
|
||||||
// Helper to check if item is marked for removal (platform-specific)
|
// Helper to check if item is marked for removal (platform-specific)
|
||||||
// Returns true if item should be skipped, handles platform-specific synchronization
|
// Returns true if item should be skipped, handles platform-specific synchronization
|
||||||
// For ESPHOME_THREAD_MULTI_NO_ATOMICS platforms, the caller must hold the scheduler lock before calling this
|
// For ESPHOME_THREAD_MULTI_NO_ATOMICS platforms, the caller must hold the scheduler lock before calling this
|
||||||
@@ -280,8 +282,9 @@ class Scheduler {
|
|||||||
bool has_cancelled_timeout_in_container_(const Container &container, Component *component, const char *name_cstr,
|
bool has_cancelled_timeout_in_container_(const Container &container, Component *component, const char *name_cstr,
|
||||||
bool match_retry) const {
|
bool match_retry) const {
|
||||||
for (const auto &item : container) {
|
for (const auto &item : container) {
|
||||||
if (item->remove && this->matches_item_(item, component, name_cstr, SchedulerItem::TIMEOUT, match_retry,
|
if (is_item_removed_(item.get()) &&
|
||||||
/* skip_removed= */ false)) {
|
this->matches_item_(item, component, name_cstr, SchedulerItem::TIMEOUT, match_retry,
|
||||||
|
/* skip_removed= */ false)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -297,6 +300,16 @@ class Scheduler {
|
|||||||
#endif /* ESPHOME_THREAD_SINGLE */
|
#endif /* ESPHOME_THREAD_SINGLE */
|
||||||
uint32_t to_remove_{0};
|
uint32_t to_remove_{0};
|
||||||
|
|
||||||
|
// Memory pool for recycling SchedulerItem objects to reduce heap churn.
|
||||||
|
// Design decisions:
|
||||||
|
// - std::vector is used instead of a fixed array because many systems only need 1-2 scheduler items
|
||||||
|
// - The vector grows dynamically up to MAX_POOL_SIZE (5) only when needed, saving memory on simple setups
|
||||||
|
// - Pool size of 5 matches typical usage (2-4 timers) while keeping memory overhead low (~250 bytes on ESP32)
|
||||||
|
// - The pool significantly reduces heap fragmentation which is critical because heap allocation/deallocation
|
||||||
|
// can stall the entire system, causing timing issues and dropped events for any components that need
|
||||||
|
// to synchronize between tasks (see https://github.com/esphome/backlog/issues/52)
|
||||||
|
std::vector<std::unique_ptr<SchedulerItem>> scheduler_item_pool_;
|
||||||
|
|
||||||
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
|
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
|
||||||
/*
|
/*
|
||||||
* Multi-threaded platforms with atomic support: last_millis_ needs atomic for lock-free updates
|
* Multi-threaded platforms with atomic support: last_millis_ needs atomic for lock-free updates
|
||||||
|
282
tests/integration/fixtures/scheduler_pool.yaml
Normal file
282
tests/integration/fixtures/scheduler_pool.yaml
Normal file
@@ -0,0 +1,282 @@
|
|||||||
|
esphome:
|
||||||
|
name: scheduler-pool-test
|
||||||
|
on_boot:
|
||||||
|
priority: -100
|
||||||
|
then:
|
||||||
|
- logger.log: "Starting scheduler pool tests"
|
||||||
|
debug_scheduler: true # Enable scheduler debug logging
|
||||||
|
|
||||||
|
host:
|
||||||
|
api:
|
||||||
|
services:
|
||||||
|
- service: run_phase_1
|
||||||
|
then:
|
||||||
|
- script.execute: test_pool_recycling
|
||||||
|
- service: run_phase_2
|
||||||
|
then:
|
||||||
|
- script.execute: test_sensor_polling
|
||||||
|
- service: run_phase_3
|
||||||
|
then:
|
||||||
|
- script.execute: test_communication_patterns
|
||||||
|
- service: run_phase_4
|
||||||
|
then:
|
||||||
|
- script.execute: test_defer_patterns
|
||||||
|
- service: run_phase_5
|
||||||
|
then:
|
||||||
|
- script.execute: test_pool_reuse_verification
|
||||||
|
- service: run_phase_6
|
||||||
|
then:
|
||||||
|
- script.execute: test_full_pool_reuse
|
||||||
|
- service: run_phase_7
|
||||||
|
then:
|
||||||
|
- script.execute: test_same_defer_optimization
|
||||||
|
- service: run_complete
|
||||||
|
then:
|
||||||
|
- script.execute: complete_test
|
||||||
|
logger:
|
||||||
|
level: VERY_VERBOSE # Need VERY_VERBOSE to see pool debug messages
|
||||||
|
|
||||||
|
globals:
|
||||||
|
- id: create_count
|
||||||
|
type: int
|
||||||
|
initial_value: '0'
|
||||||
|
- id: cancel_count
|
||||||
|
type: int
|
||||||
|
initial_value: '0'
|
||||||
|
- id: interval_counter
|
||||||
|
type: int
|
||||||
|
initial_value: '0'
|
||||||
|
- id: pool_test_done
|
||||||
|
type: bool
|
||||||
|
initial_value: 'false'
|
||||||
|
|
||||||
|
script:
|
||||||
|
- id: test_pool_recycling
|
||||||
|
then:
|
||||||
|
- logger.log: "Testing scheduler pool recycling with realistic usage patterns"
|
||||||
|
- lambda: |-
|
||||||
|
auto *component = id(test_sensor);
|
||||||
|
|
||||||
|
// Simulate realistic component behavior with timeouts that complete naturally
|
||||||
|
ESP_LOGI("test", "Phase 1: Simulating normal component lifecycle");
|
||||||
|
|
||||||
|
// Sensor update timeouts (common pattern)
|
||||||
|
App.scheduler.set_timeout(component, "sensor_init", 10, []() {
|
||||||
|
ESP_LOGD("test", "Sensor initialized");
|
||||||
|
id(create_count)++;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Retry timeout (gets cancelled if successful)
|
||||||
|
App.scheduler.set_timeout(component, "retry_timeout", 50, []() {
|
||||||
|
ESP_LOGD("test", "Retry timeout executed");
|
||||||
|
id(create_count)++;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Simulate successful operation - cancel retry
|
||||||
|
App.scheduler.set_timeout(component, "success_sim", 20, []() {
|
||||||
|
ESP_LOGD("test", "Operation succeeded, cancelling retry");
|
||||||
|
App.scheduler.cancel_timeout(id(test_sensor), "retry_timeout");
|
||||||
|
id(cancel_count)++;
|
||||||
|
});
|
||||||
|
|
||||||
|
id(create_count) += 3;
|
||||||
|
ESP_LOGI("test", "Phase 1 complete");
|
||||||
|
|
||||||
|
- id: test_sensor_polling
|
||||||
|
then:
|
||||||
|
- lambda: |-
|
||||||
|
// Simulate sensor polling pattern
|
||||||
|
ESP_LOGI("test", "Phase 2: Simulating sensor polling patterns");
|
||||||
|
auto *component = id(test_sensor);
|
||||||
|
|
||||||
|
// Multiple sensors with different update intervals
|
||||||
|
// These should only allocate once and reuse the same item for each interval execution
|
||||||
|
App.scheduler.set_interval(component, "temp_sensor", 10, []() {
|
||||||
|
ESP_LOGD("test", "Temperature sensor update");
|
||||||
|
id(interval_counter)++;
|
||||||
|
if (id(interval_counter) >= 3) {
|
||||||
|
App.scheduler.cancel_interval(id(test_sensor), "temp_sensor");
|
||||||
|
ESP_LOGD("test", "Temperature sensor stopped");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
App.scheduler.set_interval(component, "humidity_sensor", 15, []() {
|
||||||
|
ESP_LOGD("test", "Humidity sensor update");
|
||||||
|
id(interval_counter)++;
|
||||||
|
if (id(interval_counter) >= 5) {
|
||||||
|
App.scheduler.cancel_interval(id(test_sensor), "humidity_sensor");
|
||||||
|
ESP_LOGD("test", "Humidity sensor stopped");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Only 2 allocations for the intervals, no matter how many times they execute
|
||||||
|
id(create_count) += 2;
|
||||||
|
ESP_LOGD("test", "Created 2 intervals - they will reuse same items for each execution");
|
||||||
|
ESP_LOGI("test", "Phase 2 complete");
|
||||||
|
|
||||||
|
- id: test_communication_patterns
|
||||||
|
then:
|
||||||
|
- lambda: |-
|
||||||
|
// Simulate communication patterns (WiFi/API reconnects, etc)
|
||||||
|
ESP_LOGI("test", "Phase 3: Simulating communication patterns");
|
||||||
|
auto *component = id(test_sensor);
|
||||||
|
|
||||||
|
// Connection timeout pattern
|
||||||
|
App.scheduler.set_timeout(component, "connect_timeout", 200, []() {
|
||||||
|
ESP_LOGD("test", "Connection timeout - would retry");
|
||||||
|
id(create_count)++;
|
||||||
|
|
||||||
|
// Schedule retry
|
||||||
|
App.scheduler.set_timeout(id(test_sensor), "connect_retry", 100, []() {
|
||||||
|
ESP_LOGD("test", "Retrying connection");
|
||||||
|
id(create_count)++;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// Heartbeat pattern
|
||||||
|
App.scheduler.set_interval(component, "heartbeat", 50, []() {
|
||||||
|
ESP_LOGD("test", "Heartbeat");
|
||||||
|
id(interval_counter)++;
|
||||||
|
if (id(interval_counter) >= 10) {
|
||||||
|
App.scheduler.cancel_interval(id(test_sensor), "heartbeat");
|
||||||
|
ESP_LOGD("test", "Heartbeat stopped");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
id(create_count) += 2;
|
||||||
|
ESP_LOGI("test", "Phase 3 complete");
|
||||||
|
|
||||||
|
- id: test_defer_patterns
|
||||||
|
then:
|
||||||
|
- lambda: |-
|
||||||
|
// Simulate defer patterns (state changes, async operations)
|
||||||
|
ESP_LOGI("test", "Phase 4: Simulating heavy defer patterns like ratgdo");
|
||||||
|
|
||||||
|
auto *component = id(test_sensor);
|
||||||
|
|
||||||
|
// Simulate a burst of defer operations like ratgdo does with state updates
|
||||||
|
// These should execute immediately and recycle quickly to the pool
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
std::string defer_name = "defer_" + std::to_string(i);
|
||||||
|
App.scheduler.set_timeout(component, defer_name, 0, [i]() {
|
||||||
|
ESP_LOGD("test", "Defer %d executed", i);
|
||||||
|
// Force a small delay between defer executions to see recycling
|
||||||
|
if (i == 5) {
|
||||||
|
ESP_LOGI("test", "Half of defers executed, checking pool status");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
id(create_count) += 10;
|
||||||
|
ESP_LOGD("test", "Created 10 defer operations (0ms timeouts)");
|
||||||
|
|
||||||
|
// Also create some named defers that might get replaced
|
||||||
|
App.scheduler.set_timeout(component, "state_update", 0, []() {
|
||||||
|
ESP_LOGD("test", "State update 1");
|
||||||
|
});
|
||||||
|
|
||||||
|
// Replace the same named defer (should cancel previous)
|
||||||
|
App.scheduler.set_timeout(component, "state_update", 0, []() {
|
||||||
|
ESP_LOGD("test", "State update 2 (replaced)");
|
||||||
|
});
|
||||||
|
|
||||||
|
id(create_count) += 2;
|
||||||
|
id(cancel_count) += 1; // One cancelled due to replacement
|
||||||
|
|
||||||
|
ESP_LOGI("test", "Phase 4 complete");
|
||||||
|
|
||||||
|
- id: test_pool_reuse_verification
|
||||||
|
then:
|
||||||
|
- lambda: |-
|
||||||
|
ESP_LOGI("test", "Phase 5: Verifying pool reuse after everything settles");
|
||||||
|
|
||||||
|
// Cancel any remaining intervals
|
||||||
|
auto *component = id(test_sensor);
|
||||||
|
App.scheduler.cancel_interval(component, "temp_sensor");
|
||||||
|
App.scheduler.cancel_interval(component, "humidity_sensor");
|
||||||
|
App.scheduler.cancel_interval(component, "heartbeat");
|
||||||
|
|
||||||
|
ESP_LOGD("test", "Cancelled any remaining intervals");
|
||||||
|
|
||||||
|
// The pool should have items from completed timeouts in earlier phases.
|
||||||
|
// Phase 1 had 3 timeouts that completed and were recycled.
|
||||||
|
// Phase 3 had 1 timeout that completed and was recycled.
|
||||||
|
// Phase 4 had 3 defers that completed and were recycled.
|
||||||
|
// So we should have a decent pool size already from naturally completed items.
|
||||||
|
|
||||||
|
// Now create 8 new timeouts - they should reuse from pool when available
|
||||||
|
int reuse_test_count = 8;
|
||||||
|
|
||||||
|
for (int i = 0; i < reuse_test_count; i++) {
|
||||||
|
std::string name = "reuse_test_" + std::to_string(i);
|
||||||
|
App.scheduler.set_timeout(component, name, 10 + i * 5, [i]() {
|
||||||
|
ESP_LOGD("test", "Reuse test %d completed", i);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
ESP_LOGI("test", "Created %d items for reuse verification", reuse_test_count);
|
||||||
|
id(create_count) += reuse_test_count;
|
||||||
|
ESP_LOGI("test", "Phase 5 complete");
|
||||||
|
|
||||||
|
- id: test_full_pool_reuse
|
||||||
|
then:
|
||||||
|
- lambda: |-
|
||||||
|
ESP_LOGI("test", "Phase 6: Testing pool size limits after Phase 5 items complete");
|
||||||
|
|
||||||
|
// At this point, all Phase 5 timeouts should have completed and been recycled.
|
||||||
|
// The pool should be at its maximum size (5).
|
||||||
|
// Creating 10 new items tests that:
|
||||||
|
// - First 5 items reuse from the pool
|
||||||
|
// - Remaining 5 items allocate new (pool empty)
|
||||||
|
// - Pool doesn't grow beyond MAX_POOL_SIZE of 5
|
||||||
|
|
||||||
|
auto *component = id(test_sensor);
|
||||||
|
int full_reuse_count = 10;
|
||||||
|
|
||||||
|
for (int i = 0; i < full_reuse_count; i++) {
|
||||||
|
std::string name = "full_reuse_" + std::to_string(i);
|
||||||
|
App.scheduler.set_timeout(component, name, 10 + i * 5, [i]() {
|
||||||
|
ESP_LOGD("test", "Full reuse test %d completed", i);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
ESP_LOGI("test", "Created %d items for full pool reuse verification", full_reuse_count);
|
||||||
|
id(create_count) += full_reuse_count;
|
||||||
|
ESP_LOGI("test", "Phase 6 complete");
|
||||||
|
|
||||||
|
- id: test_same_defer_optimization
|
||||||
|
then:
|
||||||
|
- lambda: |-
|
||||||
|
ESP_LOGI("test", "Phase 7: Testing same-named defer optimization");
|
||||||
|
|
||||||
|
auto *component = id(test_sensor);
|
||||||
|
|
||||||
|
// Create 10 defers with the same name - should optimize to update callback in-place
|
||||||
|
// This pattern is common in components like ratgdo that repeatedly defer state updates
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
App.scheduler.set_timeout(component, "repeated_defer", 0, [i]() {
|
||||||
|
ESP_LOGD("test", "Repeated defer executed with value: %d", i);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only the first should allocate, the rest should update in-place
|
||||||
|
// We expect only 1 allocation for all 10 operations
|
||||||
|
id(create_count) += 1; // Only count 1 since others should be optimized
|
||||||
|
|
||||||
|
ESP_LOGD("test", "Created 10 same-named defers (should only allocate once)");
|
||||||
|
ESP_LOGI("test", "Phase 7 complete");
|
||||||
|
|
||||||
|
- id: complete_test
|
||||||
|
then:
|
||||||
|
- lambda: |-
|
||||||
|
ESP_LOGI("test", "Pool recycling test complete - created %d items, cancelled %d, intervals %d",
|
||||||
|
id(create_count), id(cancel_count), id(interval_counter));
|
||||||
|
|
||||||
|
sensor:
|
||||||
|
- platform: template
|
||||||
|
name: Test Sensor
|
||||||
|
id: test_sensor
|
||||||
|
lambda: return 1.0;
|
||||||
|
update_interval: never
|
||||||
|
|
||||||
|
# No interval - tests will be triggered from Python via API services
|
209
tests/integration/test_scheduler_pool.py
Normal file
209
tests/integration/test_scheduler_pool.py
Normal file
@@ -0,0 +1,209 @@
|
|||||||
|
"""Integration test for scheduler memory pool functionality."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import re
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from .types import APIClientConnectedFactory, RunCompiledFunction
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_scheduler_pool(
|
||||||
|
yaml_config: str,
|
||||||
|
run_compiled: RunCompiledFunction,
|
||||||
|
api_client_connected: APIClientConnectedFactory,
|
||||||
|
) -> None:
|
||||||
|
"""Test that the scheduler memory pool is working correctly with realistic usage.
|
||||||
|
|
||||||
|
This test simulates real-world scheduler usage patterns and verifies that:
|
||||||
|
1. Items are recycled to the pool when timeouts complete naturally
|
||||||
|
2. Items are recycled when intervals/timeouts are cancelled
|
||||||
|
3. Items are reused from the pool for new scheduler operations
|
||||||
|
4. The pool grows gradually based on actual usage patterns
|
||||||
|
5. Pool operations are logged correctly with debug scheduler enabled
|
||||||
|
"""
|
||||||
|
# Track log messages to verify pool behavior
|
||||||
|
log_lines: list[str] = []
|
||||||
|
pool_reuse_count = 0
|
||||||
|
pool_recycle_count = 0
|
||||||
|
pool_full_count = 0
|
||||||
|
new_alloc_count = 0
|
||||||
|
|
||||||
|
# Patterns to match pool operations
|
||||||
|
reuse_pattern = re.compile(r"Reused item from pool \(pool size now: (\d+)\)")
|
||||||
|
recycle_pattern = re.compile(r"Recycled item to pool \(pool size now: (\d+)\)")
|
||||||
|
pool_full_pattern = re.compile(r"Pool full \(size: (\d+)\), deleting item")
|
||||||
|
new_alloc_pattern = re.compile(r"Allocated new item \(pool empty\)")
|
||||||
|
|
||||||
|
# Futures to track when test phases complete
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
test_complete_future: asyncio.Future[bool] = loop.create_future()
|
||||||
|
phase_futures = {
|
||||||
|
1: loop.create_future(),
|
||||||
|
2: loop.create_future(),
|
||||||
|
3: loop.create_future(),
|
||||||
|
4: loop.create_future(),
|
||||||
|
5: loop.create_future(),
|
||||||
|
6: loop.create_future(),
|
||||||
|
7: loop.create_future(),
|
||||||
|
}
|
||||||
|
|
||||||
|
def check_output(line: str) -> None:
|
||||||
|
"""Check log output for pool operations and phase completion."""
|
||||||
|
nonlocal pool_reuse_count, pool_recycle_count, pool_full_count, new_alloc_count
|
||||||
|
log_lines.append(line)
|
||||||
|
|
||||||
|
# Track pool operations
|
||||||
|
if reuse_pattern.search(line):
|
||||||
|
pool_reuse_count += 1
|
||||||
|
|
||||||
|
elif recycle_pattern.search(line):
|
||||||
|
pool_recycle_count += 1
|
||||||
|
|
||||||
|
elif pool_full_pattern.search(line):
|
||||||
|
pool_full_count += 1
|
||||||
|
|
||||||
|
elif new_alloc_pattern.search(line):
|
||||||
|
new_alloc_count += 1
|
||||||
|
|
||||||
|
# Track phase completion
|
||||||
|
for phase_num in range(1, 8):
|
||||||
|
if (
|
||||||
|
f"Phase {phase_num} complete" in line
|
||||||
|
and phase_num in phase_futures
|
||||||
|
and not phase_futures[phase_num].done()
|
||||||
|
):
|
||||||
|
phase_futures[phase_num].set_result(True)
|
||||||
|
|
||||||
|
# Check for test completion
|
||||||
|
if "Pool recycling test complete" in line and not test_complete_future.done():
|
||||||
|
test_complete_future.set_result(True)
|
||||||
|
|
||||||
|
# Run the test with log monitoring
|
||||||
|
async with (
|
||||||
|
run_compiled(yaml_config, line_callback=check_output),
|
||||||
|
api_client_connected() as client,
|
||||||
|
):
|
||||||
|
# Verify device is running
|
||||||
|
device_info = await client.device_info()
|
||||||
|
assert device_info is not None
|
||||||
|
assert device_info.name == "scheduler-pool-test"
|
||||||
|
|
||||||
|
# Get list of services
|
||||||
|
entities, services = await client.list_entities_services()
|
||||||
|
service_names = {s.name for s in services}
|
||||||
|
|
||||||
|
# Verify all test services are available
|
||||||
|
expected_services = {
|
||||||
|
"run_phase_1",
|
||||||
|
"run_phase_2",
|
||||||
|
"run_phase_3",
|
||||||
|
"run_phase_4",
|
||||||
|
"run_phase_5",
|
||||||
|
"run_phase_6",
|
||||||
|
"run_phase_7",
|
||||||
|
"run_complete",
|
||||||
|
}
|
||||||
|
assert expected_services.issubset(service_names), (
|
||||||
|
f"Missing services: {expected_services - service_names}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get service objects
|
||||||
|
phase_services = {
|
||||||
|
num: next(s for s in services if s.name == f"run_phase_{num}")
|
||||||
|
for num in range(1, 8)
|
||||||
|
}
|
||||||
|
complete_service = next(s for s in services if s.name == "run_complete")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Phase 1: Component lifecycle
|
||||||
|
client.execute_service(phase_services[1], {})
|
||||||
|
await asyncio.wait_for(phase_futures[1], timeout=1.0)
|
||||||
|
await asyncio.sleep(0.05) # Let timeouts complete
|
||||||
|
|
||||||
|
# Phase 2: Sensor polling
|
||||||
|
client.execute_service(phase_services[2], {})
|
||||||
|
await asyncio.wait_for(phase_futures[2], timeout=1.0)
|
||||||
|
await asyncio.sleep(0.1) # Let intervals run a bit
|
||||||
|
|
||||||
|
# Phase 3: Communication patterns
|
||||||
|
client.execute_service(phase_services[3], {})
|
||||||
|
await asyncio.wait_for(phase_futures[3], timeout=1.0)
|
||||||
|
await asyncio.sleep(0.1) # Let heartbeat run
|
||||||
|
|
||||||
|
# Phase 4: Defer patterns
|
||||||
|
client.execute_service(phase_services[4], {})
|
||||||
|
await asyncio.wait_for(phase_futures[4], timeout=1.0)
|
||||||
|
await asyncio.sleep(0.2) # Let everything settle and recycle
|
||||||
|
|
||||||
|
# Phase 5: Pool reuse verification
|
||||||
|
client.execute_service(phase_services[5], {})
|
||||||
|
await asyncio.wait_for(phase_futures[5], timeout=1.0)
|
||||||
|
await asyncio.sleep(0.1) # Let Phase 5 timeouts complete and recycle
|
||||||
|
|
||||||
|
# Phase 6: Full pool reuse verification
|
||||||
|
client.execute_service(phase_services[6], {})
|
||||||
|
await asyncio.wait_for(phase_futures[6], timeout=1.0)
|
||||||
|
await asyncio.sleep(0.1) # Let Phase 6 timeouts complete
|
||||||
|
|
||||||
|
# Phase 7: Same-named defer optimization
|
||||||
|
client.execute_service(phase_services[7], {})
|
||||||
|
await asyncio.wait_for(phase_futures[7], timeout=1.0)
|
||||||
|
await asyncio.sleep(0.05) # Let the single defer execute
|
||||||
|
|
||||||
|
# Complete test
|
||||||
|
client.execute_service(complete_service, {})
|
||||||
|
await asyncio.wait_for(test_complete_future, timeout=0.5)
|
||||||
|
|
||||||
|
except TimeoutError as e:
|
||||||
|
# Print debug info if test times out
|
||||||
|
recent_logs = "\n".join(log_lines[-30:])
|
||||||
|
phases_completed = [num for num, fut in phase_futures.items() if fut.done()]
|
||||||
|
pytest.fail(
|
||||||
|
f"Test timed out waiting for phase/completion. Error: {e}\n"
|
||||||
|
f" Phases completed: {phases_completed}\n"
|
||||||
|
f" Pool stats:\n"
|
||||||
|
f" Reuse count: {pool_reuse_count}\n"
|
||||||
|
f" Recycle count: {pool_recycle_count}\n"
|
||||||
|
f" Pool full count: {pool_full_count}\n"
|
||||||
|
f" New alloc count: {new_alloc_count}\n"
|
||||||
|
f"Recent logs:\n{recent_logs}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify all test phases ran
|
||||||
|
for phase_num in range(1, 8):
|
||||||
|
assert phase_futures[phase_num].done(), f"Phase {phase_num} did not complete"
|
||||||
|
|
||||||
|
# Verify pool behavior
|
||||||
|
assert pool_recycle_count > 0, "Should have recycled items to pool"
|
||||||
|
|
||||||
|
# Check pool metrics
|
||||||
|
if pool_recycle_count > 0:
|
||||||
|
max_pool_size = 0
|
||||||
|
for line in log_lines:
|
||||||
|
if match := recycle_pattern.search(line):
|
||||||
|
size = int(match.group(1))
|
||||||
|
max_pool_size = max(max_pool_size, size)
|
||||||
|
|
||||||
|
# Pool can grow up to its maximum of 5
|
||||||
|
assert max_pool_size <= 5, f"Pool grew beyond maximum ({max_pool_size})"
|
||||||
|
|
||||||
|
# Log summary for debugging
|
||||||
|
print("\nScheduler Pool Test Summary (Python Orchestrated):")
|
||||||
|
print(f" Items recycled to pool: {pool_recycle_count}")
|
||||||
|
print(f" Items reused from pool: {pool_reuse_count}")
|
||||||
|
print(f" Pool full events: {pool_full_count}")
|
||||||
|
print(f" New allocations: {new_alloc_count}")
|
||||||
|
print(" All phases completed successfully")
|
||||||
|
|
||||||
|
# Verify reuse happened
|
||||||
|
if pool_reuse_count == 0 and pool_recycle_count > 3:
|
||||||
|
pytest.fail("Pool had items recycled but none were reused")
|
||||||
|
|
||||||
|
# Success - pool is working
|
||||||
|
assert pool_recycle_count > 0 or new_alloc_count < 15, (
|
||||||
|
"Pool should either recycle items or limit new allocations"
|
||||||
|
)
|
Reference in New Issue
Block a user