diff --git a/esphome/core/scheduler.cpp b/esphome/core/scheduler.cpp index d0230d46fc..9a9e61d579 100644 --- a/esphome/core/scheduler.cpp +++ b/esphome/core/scheduler.cpp @@ -14,7 +14,20 @@ namespace esphome { static const char *const TAG = "scheduler"; -static const uint32_t MAX_LOGICALLY_DELETED_ITEMS = 10; +// Memory pool configuration constants +// Pool size of 5 matches typical usage patterns (2-4 active timers) +// - Minimal memory overhead (~250 bytes on ESP32) +// - Sufficient for most configs with a couple sensors/components +// - Still prevents heap fragmentation and allocation stalls +// - Complex setups with many timers will just allocate beyond the pool +// See https://github.com/esphome/backlog/issues/52 +static constexpr size_t MAX_POOL_SIZE = 5; + +// Maximum number of logically deleted (cancelled) items before forcing cleanup. +// Set to 5 to match the pool size - when we have as many cancelled items as our +// pool can hold, it's time to clean up and recycle them. +static constexpr uint32_t MAX_LOGICALLY_DELETED_ITEMS = 5; + // Half the 32-bit range - used to detect rollovers vs normal time progression static constexpr uint32_t HALF_MAX_UINT32 = std::numeric_limits::max() / 2; // max delay to start an interval sequence @@ -79,8 +92,28 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type return; } + // Get fresh timestamp BEFORE taking lock - millis_64_ may need to acquire lock itself + const uint64_t now = this->millis_64_(millis()); + + // Take lock early to protect scheduler_item_pool_ access + LockGuard guard{this->lock_}; + // Create and populate the scheduler item - auto item = make_unique(); + std::unique_ptr item; + if (!this->scheduler_item_pool_.empty()) { + // Reuse from pool + item = std::move(this->scheduler_item_pool_.back()); + this->scheduler_item_pool_.pop_back(); +#ifdef ESPHOME_DEBUG_SCHEDULER + ESP_LOGD(TAG, "Reused item from pool (pool size now: %zu)", this->scheduler_item_pool_.size()); +#endif + } else { + // Allocate new if pool is empty + item = make_unique(); +#ifdef ESPHOME_DEBUG_SCHEDULER + ESP_LOGD(TAG, "Allocated new item (pool empty)"); +#endif + } item->component = component; item->set_name(name_cstr, !is_static_string); item->type = type; @@ -99,7 +132,6 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type // Single-core platforms don't need thread-safe defer handling if (delay == 0 && type == SchedulerItem::TIMEOUT) { // Put in defer queue for guaranteed FIFO execution - LockGuard guard{this->lock_}; if (!skip_cancel) { this->cancel_item_locked_(component, name_cstr, type); } @@ -108,9 +140,6 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type } #endif /* not ESPHOME_THREAD_SINGLE */ - // Get fresh timestamp for new timer/interval - ensures accurate scheduling - const auto now = this->millis_64_(millis()); // Fresh millis() call - // Type-specific setup if (type == SchedulerItem::INTERVAL) { item->interval = delay; @@ -142,8 +171,6 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type } #endif /* ESPHOME_DEBUG_SCHEDULER */ - LockGuard guard{this->lock_}; - // For retries, check if there's a cancelled timeout first if (is_retry && name_cstr != nullptr && type == SchedulerItem::TIMEOUT && (has_cancelled_timeout_in_container_(this->items_, component, name_cstr, /* match_retry= */ true) || @@ -319,6 +346,8 @@ void HOT Scheduler::call(uint32_t now) { if (!this->should_skip_item_(item.get())) { this->execute_item_(item.get(), now); } + // Recycle the defer item after execution + this->recycle_item_(std::move(item)); } #endif /* not ESPHOME_THREAD_SINGLE */ @@ -338,11 +367,11 @@ void HOT Scheduler::call(uint32_t now) { #ifdef ESPHOME_THREAD_MULTI_ATOMICS const auto last_dbg = this->last_millis_.load(std::memory_order_relaxed); const auto major_dbg = this->millis_major_.load(std::memory_order_relaxed); - ESP_LOGD(TAG, "Items: count=%zu, now=%" PRIu64 " (%" PRIu16 ", %" PRIu32 ")", this->items_.size(), now_64, - major_dbg, last_dbg); + ESP_LOGD(TAG, "Items: count=%zu, pool=%zu, now=%" PRIu64 " (%" PRIu16 ", %" PRIu32 ")", this->items_.size(), + this->scheduler_item_pool_.size(), now_64, major_dbg, last_dbg); #else /* not ESPHOME_THREAD_MULTI_ATOMICS */ - ESP_LOGD(TAG, "Items: count=%zu, now=%" PRIu64 " (%" PRIu16 ", %" PRIu32 ")", this->items_.size(), now_64, - this->millis_major_, this->last_millis_); + ESP_LOGD(TAG, "Items: count=%zu, pool=%zu, now=%" PRIu64 " (%" PRIu16 ", %" PRIu32 ")", this->items_.size(), + this->scheduler_item_pool_.size(), now_64, this->millis_major_, this->last_millis_); #endif /* else ESPHOME_THREAD_MULTI_ATOMICS */ // Cleanup before debug output this->cleanup_(); @@ -355,9 +384,10 @@ void HOT Scheduler::call(uint32_t now) { } const char *name = item->get_name(); - ESP_LOGD(TAG, " %s '%s/%s' interval=%" PRIu32 " next_execution in %" PRIu64 "ms at %" PRIu64, + bool is_cancelled = is_item_removed_(item.get()); + ESP_LOGD(TAG, " %s '%s/%s' interval=%" PRIu32 " next_execution in %" PRIu64 "ms at %" PRIu64 "%s", item->get_type_str(), item->get_source(), name ? name : "(null)", item->interval, - item->next_execution_ - now_64, item->next_execution_); + item->next_execution_ - now_64, item->next_execution_, is_cancelled ? " [CANCELLED]" : ""); old_items.push_back(std::move(item)); } @@ -372,8 +402,13 @@ void HOT Scheduler::call(uint32_t now) { } #endif /* ESPHOME_DEBUG_SCHEDULER */ - // If we have too many items to remove - if (this->to_remove_ > MAX_LOGICALLY_DELETED_ITEMS) { + // Cleanup removed items before processing + // First try to clean items from the top of the heap (fast path) + this->cleanup_(); + + // If we still have too many cancelled items, do a full cleanup + // This only happens if cancelled items are stuck in the middle/bottom of the heap + if (this->to_remove_ >= MAX_LOGICALLY_DELETED_ITEMS) { // We hold the lock for the entire cleanup operation because: // 1. We're rebuilding the entire items_ list, so we need exclusive access throughout // 2. Other threads must see either the old state or the new state, not intermediate states @@ -383,10 +418,13 @@ void HOT Scheduler::call(uint32_t now) { std::vector> valid_items; - // Move all non-removed items to valid_items + // Move all non-removed items to valid_items, recycle removed ones for (auto &item : this->items_) { - if (!item->remove) { + if (!is_item_removed_(item.get())) { valid_items.push_back(std::move(item)); + } else { + // Recycle removed items + this->recycle_item_(std::move(item)); } } @@ -396,9 +434,6 @@ void HOT Scheduler::call(uint32_t now) { std::make_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp); this->to_remove_ = 0; } - - // Cleanup removed items before processing - this->cleanup_(); while (!this->items_.empty()) { // use scoping to indicate visibility of `item` variable { @@ -472,6 +507,9 @@ void HOT Scheduler::call(uint32_t now) { // Add new item directly to to_add_ // since we have the lock held this->to_add_.push_back(std::move(item)); + } else { + // Timeout completed - recycle it + this->recycle_item_(std::move(item)); } has_added_items |= !this->to_add_.empty(); @@ -485,7 +523,9 @@ void HOT Scheduler::call(uint32_t now) { void HOT Scheduler::process_to_add() { LockGuard guard{this->lock_}; for (auto &it : this->to_add_) { - if (it->remove) { + if (is_item_removed_(it.get())) { + // Recycle cancelled items + this->recycle_item_(std::move(it)); continue; } @@ -525,6 +565,10 @@ size_t HOT Scheduler::cleanup_() { } void HOT Scheduler::pop_raw_() { std::pop_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp); + + // Instead of destroying, recycle the item + this->recycle_item_(std::move(this->items_.back())); + this->items_.pop_back(); } @@ -559,7 +603,7 @@ bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_c // Check all containers for matching items #ifndef ESPHOME_THREAD_SINGLE - // Only check defer queue for timeouts (intervals never go there) + // Mark items in defer queue as cancelled (they'll be skipped when processed) if (type == SchedulerItem::TIMEOUT) { for (auto &item : this->defer_queue_) { if (this->matches_item_(item, component, name_cstr, type, match_retry)) { @@ -571,11 +615,22 @@ bool HOT Scheduler::cancel_item_locked_(Component *component, const char *name_c #endif /* not ESPHOME_THREAD_SINGLE */ // Cancel items in the main heap - for (auto &item : this->items_) { - if (this->matches_item_(item, component, name_cstr, type, match_retry)) { - this->mark_item_removed_(item.get()); + // Special case: if the last item in the heap matches, we can remove it immediately + // (removing the last element doesn't break heap structure) + if (!this->items_.empty()) { + auto &last_item = this->items_.back(); + if (this->matches_item_(last_item, component, name_cstr, type, match_retry)) { + this->recycle_item_(std::move(this->items_.back())); + this->items_.pop_back(); total_cancelled++; - this->to_remove_++; // Track removals for heap items + } + // For other items in heap, we can only mark for removal (can't remove from middle of heap) + for (auto &item : this->items_) { + if (this->matches_item_(item, component, name_cstr, type, match_retry)) { + this->mark_item_removed_(item.get()); + total_cancelled++; + this->to_remove_++; // Track removals for heap items + } } } @@ -754,4 +809,25 @@ bool HOT Scheduler::SchedulerItem::cmp(const std::unique_ptr &a, return a->next_execution_ > b->next_execution_; } +void Scheduler::recycle_item_(std::unique_ptr item) { + if (!item) + return; + + if (this->scheduler_item_pool_.size() < MAX_POOL_SIZE) { + // Clear callback to release captured resources + item->callback = nullptr; + // Clear dynamic name if any + item->clear_dynamic_name(); + this->scheduler_item_pool_.push_back(std::move(item)); +#ifdef ESPHOME_DEBUG_SCHEDULER + ESP_LOGD(TAG, "Recycled item to pool (pool size now: %zu)", this->scheduler_item_pool_.size()); +#endif + } else { +#ifdef ESPHOME_DEBUG_SCHEDULER + ESP_LOGD(TAG, "Pool full (size: %zu), deleting item", this->scheduler_item_pool_.size()); +#endif + } + // else: unique_ptr will delete the item when it goes out of scope +} + } // namespace esphome diff --git a/esphome/core/scheduler.h b/esphome/core/scheduler.h index f469a60d5c..85cfaab2e0 100644 --- a/esphome/core/scheduler.h +++ b/esphome/core/scheduler.h @@ -142,11 +142,7 @@ class Scheduler { } // Destructor to clean up dynamic names - ~SchedulerItem() { - if (name_is_dynamic) { - delete[] name_.dynamic_name; - } - } + ~SchedulerItem() { clear_dynamic_name(); } // Delete copy operations to prevent accidental copies SchedulerItem(const SchedulerItem &) = delete; @@ -159,13 +155,19 @@ class Scheduler { // Helper to get the name regardless of storage type const char *get_name() const { return name_is_dynamic ? name_.dynamic_name : name_.static_name; } + // Helper to clear dynamic name if allocated + void clear_dynamic_name() { + if (name_is_dynamic && name_.dynamic_name) { + delete[] name_.dynamic_name; + name_.dynamic_name = nullptr; + name_is_dynamic = false; + } + } + // Helper to set name with proper ownership void set_name(const char *name, bool make_copy = false) { // Clean up old dynamic name if any - if (name_is_dynamic && name_.dynamic_name) { - delete[] name_.dynamic_name; - name_is_dynamic = false; - } + clear_dynamic_name(); if (!name) { // nullptr case - no name provided @@ -214,6 +216,15 @@ class Scheduler { // Common implementation for cancel operations bool cancel_item_(Component *component, bool is_static_string, const void *name_ptr, SchedulerItem::Type type); + // Helper to check if two scheduler item names match + inline bool HOT names_match_(const char *name1, const char *name2) const { + // Check pointer equality first (common for static strings), then string contents + // The core ESPHome codebase uses static strings (const char*) for component names, + // making pointer comparison effective. The std::string overloads exist only for + // compatibility with external components but are rarely used in practice. + return (name1 != nullptr && name2 != nullptr) && ((name1 == name2) || (strcmp(name1, name2) == 0)); + } + // Helper function to check if item matches criteria for cancellation inline bool HOT matches_item_(const std::unique_ptr &item, Component *component, const char *name_cstr, SchedulerItem::Type type, bool match_retry, bool skip_removed = true) const { @@ -221,29 +232,20 @@ class Scheduler { (match_retry && !item->is_retry)) { return false; } - const char *item_name = item->get_name(); - if (item_name == nullptr) { - return false; - } - // Fast path: if pointers are equal - // This is effective because the core ESPHome codebase uses static strings (const char*) - // for component names. The std::string overloads exist only for compatibility with - // external components, but are rarely used in practice. - if (item_name == name_cstr) { - return true; - } - // Slow path: compare string contents - return strcmp(name_cstr, item_name) == 0; + return this->names_match_(item->get_name(), name_cstr); } // Helper to execute a scheduler item void execute_item_(SchedulerItem *item, uint32_t now); // Helper to check if item should be skipped - bool should_skip_item_(const SchedulerItem *item) const { - return item->remove || (item->component != nullptr && item->component->is_failed()); + bool should_skip_item_(SchedulerItem *item) const { + return is_item_removed_(item) || (item->component != nullptr && item->component->is_failed()); } + // Helper to recycle a SchedulerItem + void recycle_item_(std::unique_ptr item); + // Helper to check if item is marked for removal (platform-specific) // Returns true if item should be skipped, handles platform-specific synchronization // For ESPHOME_THREAD_MULTI_NO_ATOMICS platforms, the caller must hold the scheduler lock before calling this @@ -280,8 +282,9 @@ class Scheduler { bool has_cancelled_timeout_in_container_(const Container &container, Component *component, const char *name_cstr, bool match_retry) const { for (const auto &item : container) { - if (item->remove && this->matches_item_(item, component, name_cstr, SchedulerItem::TIMEOUT, match_retry, - /* skip_removed= */ false)) { + if (is_item_removed_(item.get()) && + this->matches_item_(item, component, name_cstr, SchedulerItem::TIMEOUT, match_retry, + /* skip_removed= */ false)) { return true; } } @@ -297,6 +300,16 @@ class Scheduler { #endif /* ESPHOME_THREAD_SINGLE */ uint32_t to_remove_{0}; + // Memory pool for recycling SchedulerItem objects to reduce heap churn. + // Design decisions: + // - std::vector is used instead of a fixed array because many systems only need 1-2 scheduler items + // - The vector grows dynamically up to MAX_POOL_SIZE (5) only when needed, saving memory on simple setups + // - Pool size of 5 matches typical usage (2-4 timers) while keeping memory overhead low (~250 bytes on ESP32) + // - The pool significantly reduces heap fragmentation which is critical because heap allocation/deallocation + // can stall the entire system, causing timing issues and dropped events for any components that need + // to synchronize between tasks (see https://github.com/esphome/backlog/issues/52) + std::vector> scheduler_item_pool_; + #ifdef ESPHOME_THREAD_MULTI_ATOMICS /* * Multi-threaded platforms with atomic support: last_millis_ needs atomic for lock-free updates diff --git a/tests/integration/fixtures/scheduler_pool.yaml b/tests/integration/fixtures/scheduler_pool.yaml new file mode 100644 index 0000000000..5389125188 --- /dev/null +++ b/tests/integration/fixtures/scheduler_pool.yaml @@ -0,0 +1,282 @@ +esphome: + name: scheduler-pool-test + on_boot: + priority: -100 + then: + - logger.log: "Starting scheduler pool tests" + debug_scheduler: true # Enable scheduler debug logging + +host: +api: + services: + - service: run_phase_1 + then: + - script.execute: test_pool_recycling + - service: run_phase_2 + then: + - script.execute: test_sensor_polling + - service: run_phase_3 + then: + - script.execute: test_communication_patterns + - service: run_phase_4 + then: + - script.execute: test_defer_patterns + - service: run_phase_5 + then: + - script.execute: test_pool_reuse_verification + - service: run_phase_6 + then: + - script.execute: test_full_pool_reuse + - service: run_phase_7 + then: + - script.execute: test_same_defer_optimization + - service: run_complete + then: + - script.execute: complete_test +logger: + level: VERY_VERBOSE # Need VERY_VERBOSE to see pool debug messages + +globals: + - id: create_count + type: int + initial_value: '0' + - id: cancel_count + type: int + initial_value: '0' + - id: interval_counter + type: int + initial_value: '0' + - id: pool_test_done + type: bool + initial_value: 'false' + +script: + - id: test_pool_recycling + then: + - logger.log: "Testing scheduler pool recycling with realistic usage patterns" + - lambda: |- + auto *component = id(test_sensor); + + // Simulate realistic component behavior with timeouts that complete naturally + ESP_LOGI("test", "Phase 1: Simulating normal component lifecycle"); + + // Sensor update timeouts (common pattern) + App.scheduler.set_timeout(component, "sensor_init", 10, []() { + ESP_LOGD("test", "Sensor initialized"); + id(create_count)++; + }); + + // Retry timeout (gets cancelled if successful) + App.scheduler.set_timeout(component, "retry_timeout", 50, []() { + ESP_LOGD("test", "Retry timeout executed"); + id(create_count)++; + }); + + // Simulate successful operation - cancel retry + App.scheduler.set_timeout(component, "success_sim", 20, []() { + ESP_LOGD("test", "Operation succeeded, cancelling retry"); + App.scheduler.cancel_timeout(id(test_sensor), "retry_timeout"); + id(cancel_count)++; + }); + + id(create_count) += 3; + ESP_LOGI("test", "Phase 1 complete"); + + - id: test_sensor_polling + then: + - lambda: |- + // Simulate sensor polling pattern + ESP_LOGI("test", "Phase 2: Simulating sensor polling patterns"); + auto *component = id(test_sensor); + + // Multiple sensors with different update intervals + // These should only allocate once and reuse the same item for each interval execution + App.scheduler.set_interval(component, "temp_sensor", 10, []() { + ESP_LOGD("test", "Temperature sensor update"); + id(interval_counter)++; + if (id(interval_counter) >= 3) { + App.scheduler.cancel_interval(id(test_sensor), "temp_sensor"); + ESP_LOGD("test", "Temperature sensor stopped"); + } + }); + + App.scheduler.set_interval(component, "humidity_sensor", 15, []() { + ESP_LOGD("test", "Humidity sensor update"); + id(interval_counter)++; + if (id(interval_counter) >= 5) { + App.scheduler.cancel_interval(id(test_sensor), "humidity_sensor"); + ESP_LOGD("test", "Humidity sensor stopped"); + } + }); + + // Only 2 allocations for the intervals, no matter how many times they execute + id(create_count) += 2; + ESP_LOGD("test", "Created 2 intervals - they will reuse same items for each execution"); + ESP_LOGI("test", "Phase 2 complete"); + + - id: test_communication_patterns + then: + - lambda: |- + // Simulate communication patterns (WiFi/API reconnects, etc) + ESP_LOGI("test", "Phase 3: Simulating communication patterns"); + auto *component = id(test_sensor); + + // Connection timeout pattern + App.scheduler.set_timeout(component, "connect_timeout", 200, []() { + ESP_LOGD("test", "Connection timeout - would retry"); + id(create_count)++; + + // Schedule retry + App.scheduler.set_timeout(id(test_sensor), "connect_retry", 100, []() { + ESP_LOGD("test", "Retrying connection"); + id(create_count)++; + }); + }); + + // Heartbeat pattern + App.scheduler.set_interval(component, "heartbeat", 50, []() { + ESP_LOGD("test", "Heartbeat"); + id(interval_counter)++; + if (id(interval_counter) >= 10) { + App.scheduler.cancel_interval(id(test_sensor), "heartbeat"); + ESP_LOGD("test", "Heartbeat stopped"); + } + }); + + id(create_count) += 2; + ESP_LOGI("test", "Phase 3 complete"); + + - id: test_defer_patterns + then: + - lambda: |- + // Simulate defer patterns (state changes, async operations) + ESP_LOGI("test", "Phase 4: Simulating heavy defer patterns like ratgdo"); + + auto *component = id(test_sensor); + + // Simulate a burst of defer operations like ratgdo does with state updates + // These should execute immediately and recycle quickly to the pool + for (int i = 0; i < 10; i++) { + std::string defer_name = "defer_" + std::to_string(i); + App.scheduler.set_timeout(component, defer_name, 0, [i]() { + ESP_LOGD("test", "Defer %d executed", i); + // Force a small delay between defer executions to see recycling + if (i == 5) { + ESP_LOGI("test", "Half of defers executed, checking pool status"); + } + }); + } + + id(create_count) += 10; + ESP_LOGD("test", "Created 10 defer operations (0ms timeouts)"); + + // Also create some named defers that might get replaced + App.scheduler.set_timeout(component, "state_update", 0, []() { + ESP_LOGD("test", "State update 1"); + }); + + // Replace the same named defer (should cancel previous) + App.scheduler.set_timeout(component, "state_update", 0, []() { + ESP_LOGD("test", "State update 2 (replaced)"); + }); + + id(create_count) += 2; + id(cancel_count) += 1; // One cancelled due to replacement + + ESP_LOGI("test", "Phase 4 complete"); + + - id: test_pool_reuse_verification + then: + - lambda: |- + ESP_LOGI("test", "Phase 5: Verifying pool reuse after everything settles"); + + // Cancel any remaining intervals + auto *component = id(test_sensor); + App.scheduler.cancel_interval(component, "temp_sensor"); + App.scheduler.cancel_interval(component, "humidity_sensor"); + App.scheduler.cancel_interval(component, "heartbeat"); + + ESP_LOGD("test", "Cancelled any remaining intervals"); + + // The pool should have items from completed timeouts in earlier phases. + // Phase 1 had 3 timeouts that completed and were recycled. + // Phase 3 had 1 timeout that completed and was recycled. + // Phase 4 had 3 defers that completed and were recycled. + // So we should have a decent pool size already from naturally completed items. + + // Now create 8 new timeouts - they should reuse from pool when available + int reuse_test_count = 8; + + for (int i = 0; i < reuse_test_count; i++) { + std::string name = "reuse_test_" + std::to_string(i); + App.scheduler.set_timeout(component, name, 10 + i * 5, [i]() { + ESP_LOGD("test", "Reuse test %d completed", i); + }); + } + + ESP_LOGI("test", "Created %d items for reuse verification", reuse_test_count); + id(create_count) += reuse_test_count; + ESP_LOGI("test", "Phase 5 complete"); + + - id: test_full_pool_reuse + then: + - lambda: |- + ESP_LOGI("test", "Phase 6: Testing pool size limits after Phase 5 items complete"); + + // At this point, all Phase 5 timeouts should have completed and been recycled. + // The pool should be at its maximum size (5). + // Creating 10 new items tests that: + // - First 5 items reuse from the pool + // - Remaining 5 items allocate new (pool empty) + // - Pool doesn't grow beyond MAX_POOL_SIZE of 5 + + auto *component = id(test_sensor); + int full_reuse_count = 10; + + for (int i = 0; i < full_reuse_count; i++) { + std::string name = "full_reuse_" + std::to_string(i); + App.scheduler.set_timeout(component, name, 10 + i * 5, [i]() { + ESP_LOGD("test", "Full reuse test %d completed", i); + }); + } + + ESP_LOGI("test", "Created %d items for full pool reuse verification", full_reuse_count); + id(create_count) += full_reuse_count; + ESP_LOGI("test", "Phase 6 complete"); + + - id: test_same_defer_optimization + then: + - lambda: |- + ESP_LOGI("test", "Phase 7: Testing same-named defer optimization"); + + auto *component = id(test_sensor); + + // Create 10 defers with the same name - should optimize to update callback in-place + // This pattern is common in components like ratgdo that repeatedly defer state updates + for (int i = 0; i < 10; i++) { + App.scheduler.set_timeout(component, "repeated_defer", 0, [i]() { + ESP_LOGD("test", "Repeated defer executed with value: %d", i); + }); + } + + // Only the first should allocate, the rest should update in-place + // We expect only 1 allocation for all 10 operations + id(create_count) += 1; // Only count 1 since others should be optimized + + ESP_LOGD("test", "Created 10 same-named defers (should only allocate once)"); + ESP_LOGI("test", "Phase 7 complete"); + + - id: complete_test + then: + - lambda: |- + ESP_LOGI("test", "Pool recycling test complete - created %d items, cancelled %d, intervals %d", + id(create_count), id(cancel_count), id(interval_counter)); + +sensor: + - platform: template + name: Test Sensor + id: test_sensor + lambda: return 1.0; + update_interval: never + +# No interval - tests will be triggered from Python via API services diff --git a/tests/integration/test_scheduler_pool.py b/tests/integration/test_scheduler_pool.py new file mode 100644 index 0000000000..b5f9f12631 --- /dev/null +++ b/tests/integration/test_scheduler_pool.py @@ -0,0 +1,209 @@ +"""Integration test for scheduler memory pool functionality.""" + +from __future__ import annotations + +import asyncio +import re + +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_scheduler_pool( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that the scheduler memory pool is working correctly with realistic usage. + + This test simulates real-world scheduler usage patterns and verifies that: + 1. Items are recycled to the pool when timeouts complete naturally + 2. Items are recycled when intervals/timeouts are cancelled + 3. Items are reused from the pool for new scheduler operations + 4. The pool grows gradually based on actual usage patterns + 5. Pool operations are logged correctly with debug scheduler enabled + """ + # Track log messages to verify pool behavior + log_lines: list[str] = [] + pool_reuse_count = 0 + pool_recycle_count = 0 + pool_full_count = 0 + new_alloc_count = 0 + + # Patterns to match pool operations + reuse_pattern = re.compile(r"Reused item from pool \(pool size now: (\d+)\)") + recycle_pattern = re.compile(r"Recycled item to pool \(pool size now: (\d+)\)") + pool_full_pattern = re.compile(r"Pool full \(size: (\d+)\), deleting item") + new_alloc_pattern = re.compile(r"Allocated new item \(pool empty\)") + + # Futures to track when test phases complete + loop = asyncio.get_running_loop() + test_complete_future: asyncio.Future[bool] = loop.create_future() + phase_futures = { + 1: loop.create_future(), + 2: loop.create_future(), + 3: loop.create_future(), + 4: loop.create_future(), + 5: loop.create_future(), + 6: loop.create_future(), + 7: loop.create_future(), + } + + def check_output(line: str) -> None: + """Check log output for pool operations and phase completion.""" + nonlocal pool_reuse_count, pool_recycle_count, pool_full_count, new_alloc_count + log_lines.append(line) + + # Track pool operations + if reuse_pattern.search(line): + pool_reuse_count += 1 + + elif recycle_pattern.search(line): + pool_recycle_count += 1 + + elif pool_full_pattern.search(line): + pool_full_count += 1 + + elif new_alloc_pattern.search(line): + new_alloc_count += 1 + + # Track phase completion + for phase_num in range(1, 8): + if ( + f"Phase {phase_num} complete" in line + and phase_num in phase_futures + and not phase_futures[phase_num].done() + ): + phase_futures[phase_num].set_result(True) + + # Check for test completion + if "Pool recycling test complete" in line and not test_complete_future.done(): + test_complete_future.set_result(True) + + # Run the test with log monitoring + async with ( + run_compiled(yaml_config, line_callback=check_output), + api_client_connected() as client, + ): + # Verify device is running + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "scheduler-pool-test" + + # Get list of services + entities, services = await client.list_entities_services() + service_names = {s.name for s in services} + + # Verify all test services are available + expected_services = { + "run_phase_1", + "run_phase_2", + "run_phase_3", + "run_phase_4", + "run_phase_5", + "run_phase_6", + "run_phase_7", + "run_complete", + } + assert expected_services.issubset(service_names), ( + f"Missing services: {expected_services - service_names}" + ) + + # Get service objects + phase_services = { + num: next(s for s in services if s.name == f"run_phase_{num}") + for num in range(1, 8) + } + complete_service = next(s for s in services if s.name == "run_complete") + + try: + # Phase 1: Component lifecycle + client.execute_service(phase_services[1], {}) + await asyncio.wait_for(phase_futures[1], timeout=1.0) + await asyncio.sleep(0.05) # Let timeouts complete + + # Phase 2: Sensor polling + client.execute_service(phase_services[2], {}) + await asyncio.wait_for(phase_futures[2], timeout=1.0) + await asyncio.sleep(0.1) # Let intervals run a bit + + # Phase 3: Communication patterns + client.execute_service(phase_services[3], {}) + await asyncio.wait_for(phase_futures[3], timeout=1.0) + await asyncio.sleep(0.1) # Let heartbeat run + + # Phase 4: Defer patterns + client.execute_service(phase_services[4], {}) + await asyncio.wait_for(phase_futures[4], timeout=1.0) + await asyncio.sleep(0.2) # Let everything settle and recycle + + # Phase 5: Pool reuse verification + client.execute_service(phase_services[5], {}) + await asyncio.wait_for(phase_futures[5], timeout=1.0) + await asyncio.sleep(0.1) # Let Phase 5 timeouts complete and recycle + + # Phase 6: Full pool reuse verification + client.execute_service(phase_services[6], {}) + await asyncio.wait_for(phase_futures[6], timeout=1.0) + await asyncio.sleep(0.1) # Let Phase 6 timeouts complete + + # Phase 7: Same-named defer optimization + client.execute_service(phase_services[7], {}) + await asyncio.wait_for(phase_futures[7], timeout=1.0) + await asyncio.sleep(0.05) # Let the single defer execute + + # Complete test + client.execute_service(complete_service, {}) + await asyncio.wait_for(test_complete_future, timeout=0.5) + + except TimeoutError as e: + # Print debug info if test times out + recent_logs = "\n".join(log_lines[-30:]) + phases_completed = [num for num, fut in phase_futures.items() if fut.done()] + pytest.fail( + f"Test timed out waiting for phase/completion. Error: {e}\n" + f" Phases completed: {phases_completed}\n" + f" Pool stats:\n" + f" Reuse count: {pool_reuse_count}\n" + f" Recycle count: {pool_recycle_count}\n" + f" Pool full count: {pool_full_count}\n" + f" New alloc count: {new_alloc_count}\n" + f"Recent logs:\n{recent_logs}" + ) + + # Verify all test phases ran + for phase_num in range(1, 8): + assert phase_futures[phase_num].done(), f"Phase {phase_num} did not complete" + + # Verify pool behavior + assert pool_recycle_count > 0, "Should have recycled items to pool" + + # Check pool metrics + if pool_recycle_count > 0: + max_pool_size = 0 + for line in log_lines: + if match := recycle_pattern.search(line): + size = int(match.group(1)) + max_pool_size = max(max_pool_size, size) + + # Pool can grow up to its maximum of 5 + assert max_pool_size <= 5, f"Pool grew beyond maximum ({max_pool_size})" + + # Log summary for debugging + print("\nScheduler Pool Test Summary (Python Orchestrated):") + print(f" Items recycled to pool: {pool_recycle_count}") + print(f" Items reused from pool: {pool_reuse_count}") + print(f" Pool full events: {pool_full_count}") + print(f" New allocations: {new_alloc_count}") + print(" All phases completed successfully") + + # Verify reuse happened + if pool_reuse_count == 0 and pool_recycle_count > 3: + pytest.fail("Pool had items recycled but none were reused") + + # Success - pool is working + assert pool_recycle_count > 0 or new_alloc_count < 15, ( + "Pool should either recycle items or limit new allocations" + )