mirror of
https://github.com/esphome/esphome.git
synced 2025-09-16 10:12:21 +01:00
Merge branch 'scheduler_pool_v2' into integration
This commit is contained in:
@@ -1135,7 +1135,7 @@ void ExecuteServiceArgument::dump_to(std::string &out) const {
|
|||||||
dump_field(out, "string_", this->string_);
|
dump_field(out, "string_", this->string_);
|
||||||
dump_field(out, "int_", this->int_);
|
dump_field(out, "int_", this->int_);
|
||||||
for (const auto it : this->bool_array) {
|
for (const auto it : this->bool_array) {
|
||||||
dump_field(out, "bool_array", it, 4);
|
dump_field(out, "bool_array", static_cast<bool>(it), 4);
|
||||||
}
|
}
|
||||||
for (const auto &it : this->int_array) {
|
for (const auto &it : this->int_array) {
|
||||||
dump_field(out, "int_array", it, 4);
|
dump_field(out, "int_array", it, 4);
|
||||||
|
@@ -14,7 +14,23 @@ namespace esphome {
|
|||||||
|
|
||||||
static const char *const TAG = "scheduler";
|
static const char *const TAG = "scheduler";
|
||||||
|
|
||||||
static const uint32_t MAX_LOGICALLY_DELETED_ITEMS = 10;
|
// Memory pool configuration constants
|
||||||
|
// Pool size of 10 is a balance between memory usage and performance:
|
||||||
|
// - Small enough to not waste memory on simple configs (1-2 timers)
|
||||||
|
// - Large enough to handle complex setups with multiple sensors/components
|
||||||
|
// - Prevents system-wide stalls from heap allocation/deallocation that can
|
||||||
|
// disrupt task synchronization and cause dropped events
|
||||||
|
static constexpr size_t MAX_POOL_SIZE = 10;
|
||||||
|
// Maximum number of cancelled items to keep in the heap before forcing a cleanup.
|
||||||
|
// Set to 6 to trigger cleanup relatively frequently, ensuring cancelled items are
|
||||||
|
// recycled to the pool in a timely manner to maintain pool efficiency.
|
||||||
|
static const uint32_t MAX_LOGICALLY_DELETED_ITEMS = 6;
|
||||||
|
|
||||||
|
// Ensure MAX_LOGICALLY_DELETED_ITEMS is at least 4 smaller than MAX_POOL_SIZE
|
||||||
|
// This guarantees we have room in the pool for recycled items when cleanup occurs
|
||||||
|
static_assert(MAX_LOGICALLY_DELETED_ITEMS + 4 <= MAX_POOL_SIZE,
|
||||||
|
"MAX_LOGICALLY_DELETED_ITEMS must be at least 4 smaller than MAX_POOL_SIZE");
|
||||||
|
|
||||||
// Half the 32-bit range - used to detect rollovers vs normal time progression
|
// Half the 32-bit range - used to detect rollovers vs normal time progression
|
||||||
static constexpr uint32_t HALF_MAX_UINT32 = std::numeric_limits<uint32_t>::max() / 2;
|
static constexpr uint32_t HALF_MAX_UINT32 = std::numeric_limits<uint32_t>::max() / 2;
|
||||||
// max delay to start an interval sequence
|
// max delay to start an interval sequence
|
||||||
@@ -91,9 +107,15 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type
|
|||||||
// Reuse from pool
|
// Reuse from pool
|
||||||
item = std::move(this->scheduler_item_pool_.back());
|
item = std::move(this->scheduler_item_pool_.back());
|
||||||
this->scheduler_item_pool_.pop_back();
|
this->scheduler_item_pool_.pop_back();
|
||||||
|
#ifdef ESPHOME_DEBUG_SCHEDULER
|
||||||
|
ESP_LOGVV(TAG, "Reused item from pool (pool size now: %zu)", this->scheduler_item_pool_.size());
|
||||||
|
#endif
|
||||||
} else {
|
} else {
|
||||||
// Allocate new if pool is empty
|
// Allocate new if pool is empty
|
||||||
item = make_unique<SchedulerItem>();
|
item = make_unique<SchedulerItem>();
|
||||||
|
#ifdef ESPHOME_DEBUG_SCHEDULER
|
||||||
|
ESP_LOGVV(TAG, "Allocated new item (pool empty)");
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
item->component = component;
|
item->component = component;
|
||||||
item->set_name(name_cstr, !is_static_string);
|
item->set_name(name_cstr, !is_static_string);
|
||||||
@@ -327,6 +349,8 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
if (!this->should_skip_item_(item.get())) {
|
if (!this->should_skip_item_(item.get())) {
|
||||||
this->execute_item_(item.get(), now);
|
this->execute_item_(item.get(), now);
|
||||||
}
|
}
|
||||||
|
// Recycle the defer item after execution
|
||||||
|
this->recycle_item_(std::move(item));
|
||||||
}
|
}
|
||||||
#endif /* not ESPHOME_THREAD_SINGLE */
|
#endif /* not ESPHOME_THREAD_SINGLE */
|
||||||
|
|
||||||
@@ -759,18 +783,19 @@ void Scheduler::recycle_item_(std::unique_ptr<SchedulerItem> item) {
|
|||||||
if (!item)
|
if (!item)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
// Pool size of 8 is a balance between memory usage and performance:
|
|
||||||
// - Small enough to not waste memory on simple configs (1-2 timers)
|
|
||||||
// - Large enough to handle complex setups with multiple sensors/components
|
|
||||||
// - Prevents system-wide stalls from heap allocation/deallocation that can
|
|
||||||
// disrupt task synchronization and cause dropped events
|
|
||||||
static constexpr size_t MAX_POOL_SIZE = 8;
|
|
||||||
if (this->scheduler_item_pool_.size() < MAX_POOL_SIZE) {
|
if (this->scheduler_item_pool_.size() < MAX_POOL_SIZE) {
|
||||||
// Clear callback to release captured resources
|
// Clear callback to release captured resources
|
||||||
item->callback = nullptr;
|
item->callback = nullptr;
|
||||||
// Clear dynamic name if any
|
// Clear dynamic name if any
|
||||||
item->clear_dynamic_name();
|
item->clear_dynamic_name();
|
||||||
this->scheduler_item_pool_.push_back(std::move(item));
|
this->scheduler_item_pool_.push_back(std::move(item));
|
||||||
|
#ifdef ESPHOME_DEBUG_SCHEDULER
|
||||||
|
ESP_LOGVV(TAG, "Recycled item to pool (pool size now: %zu)", this->scheduler_item_pool_.size());
|
||||||
|
#endif
|
||||||
|
} else {
|
||||||
|
#ifdef ESPHOME_DEBUG_SCHEDULER
|
||||||
|
ESP_LOGVV(TAG, "Pool full (size: %zu), deleting item", this->scheduler_item_pool_.size());
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
// else: unique_ptr will delete the item when it goes out of scope
|
// else: unique_ptr will delete the item when it goes out of scope
|
||||||
}
|
}
|
||||||
|
@@ -325,7 +325,7 @@ class Scheduler {
|
|||||||
// Memory pool for recycling SchedulerItem objects to reduce heap churn.
|
// Memory pool for recycling SchedulerItem objects to reduce heap churn.
|
||||||
// Design decisions:
|
// Design decisions:
|
||||||
// - std::vector is used instead of a fixed array because many systems only need 1-2 scheduler items
|
// - std::vector is used instead of a fixed array because many systems only need 1-2 scheduler items
|
||||||
// - The vector grows dynamically up to MAX_POOL_SIZE (8) only when needed, saving memory on simple setups
|
// - The vector grows dynamically up to MAX_POOL_SIZE (10) only when needed, saving memory on simple setups
|
||||||
// - This approach balances memory efficiency for simple configs with performance for complex ones
|
// - This approach balances memory efficiency for simple configs with performance for complex ones
|
||||||
// - The pool significantly reduces heap fragmentation which is critical because heap allocation/deallocation
|
// - The pool significantly reduces heap fragmentation which is critical because heap allocation/deallocation
|
||||||
// can stall the entire system, causing timing issues and dropped events for any components that need
|
// can stall the entire system, causing timing issues and dropped events for any components that need
|
||||||
|
251
tests/integration/fixtures/scheduler_pool.yaml
Normal file
251
tests/integration/fixtures/scheduler_pool.yaml
Normal file
@@ -0,0 +1,251 @@
|
|||||||
|
esphome:
|
||||||
|
name: scheduler-pool-test
|
||||||
|
on_boot:
|
||||||
|
priority: -100
|
||||||
|
then:
|
||||||
|
- logger.log: "Starting scheduler pool tests"
|
||||||
|
debug_scheduler: true # Enable scheduler debug logging
|
||||||
|
|
||||||
|
host:
|
||||||
|
api:
|
||||||
|
services:
|
||||||
|
- service: run_phase_1
|
||||||
|
then:
|
||||||
|
- script.execute: test_pool_recycling
|
||||||
|
- service: run_phase_2
|
||||||
|
then:
|
||||||
|
- script.execute: test_sensor_polling
|
||||||
|
- service: run_phase_3
|
||||||
|
then:
|
||||||
|
- script.execute: test_communication_patterns
|
||||||
|
- service: run_phase_4
|
||||||
|
then:
|
||||||
|
- script.execute: test_defer_patterns
|
||||||
|
- service: run_phase_5
|
||||||
|
then:
|
||||||
|
- script.execute: test_pool_reuse_verification
|
||||||
|
- service: run_phase_6
|
||||||
|
then:
|
||||||
|
- script.execute: test_full_pool_reuse
|
||||||
|
- service: run_complete
|
||||||
|
then:
|
||||||
|
- script.execute: complete_test
|
||||||
|
logger:
|
||||||
|
level: VERY_VERBOSE # Need VERY_VERBOSE to see pool debug messages
|
||||||
|
|
||||||
|
globals:
|
||||||
|
- id: create_count
|
||||||
|
type: int
|
||||||
|
initial_value: '0'
|
||||||
|
- id: cancel_count
|
||||||
|
type: int
|
||||||
|
initial_value: '0'
|
||||||
|
- id: interval_counter
|
||||||
|
type: int
|
||||||
|
initial_value: '0'
|
||||||
|
- id: pool_test_done
|
||||||
|
type: bool
|
||||||
|
initial_value: 'false'
|
||||||
|
|
||||||
|
script:
|
||||||
|
- id: test_pool_recycling
|
||||||
|
then:
|
||||||
|
- logger.log: "Testing scheduler pool recycling with realistic usage patterns"
|
||||||
|
- lambda: |-
|
||||||
|
auto *component = id(test_sensor);
|
||||||
|
|
||||||
|
// Simulate realistic component behavior with timeouts that complete naturally
|
||||||
|
ESP_LOGI("test", "Phase 1: Simulating normal component lifecycle");
|
||||||
|
|
||||||
|
// Sensor update timeouts (common pattern)
|
||||||
|
App.scheduler.set_timeout(component, "sensor_init", 10, []() {
|
||||||
|
ESP_LOGD("test", "Sensor initialized");
|
||||||
|
id(create_count)++;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Retry timeout (gets cancelled if successful)
|
||||||
|
App.scheduler.set_timeout(component, "retry_timeout", 50, []() {
|
||||||
|
ESP_LOGD("test", "Retry timeout executed");
|
||||||
|
id(create_count)++;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Simulate successful operation - cancel retry
|
||||||
|
App.scheduler.set_timeout(component, "success_sim", 20, []() {
|
||||||
|
ESP_LOGD("test", "Operation succeeded, cancelling retry");
|
||||||
|
App.scheduler.cancel_timeout(id(test_sensor), "retry_timeout");
|
||||||
|
id(cancel_count)++;
|
||||||
|
});
|
||||||
|
|
||||||
|
id(create_count) += 3;
|
||||||
|
ESP_LOGI("test", "Phase 1 complete");
|
||||||
|
|
||||||
|
- id: test_sensor_polling
|
||||||
|
then:
|
||||||
|
- lambda: |-
|
||||||
|
// Simulate sensor polling pattern
|
||||||
|
ESP_LOGI("test", "Phase 2: Simulating sensor polling patterns");
|
||||||
|
auto *component = id(test_sensor);
|
||||||
|
|
||||||
|
// Multiple sensors with different update intervals
|
||||||
|
App.scheduler.set_interval(component, "temp_sensor", 100, []() {
|
||||||
|
ESP_LOGD("test", "Temperature sensor update");
|
||||||
|
id(interval_counter)++;
|
||||||
|
if (id(interval_counter) >= 3) {
|
||||||
|
App.scheduler.cancel_interval(id(test_sensor), "temp_sensor");
|
||||||
|
ESP_LOGD("test", "Temperature sensor stopped");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
App.scheduler.set_interval(component, "humidity_sensor", 150, []() {
|
||||||
|
ESP_LOGD("test", "Humidity sensor update");
|
||||||
|
id(interval_counter)++;
|
||||||
|
if (id(interval_counter) >= 5) {
|
||||||
|
App.scheduler.cancel_interval(id(test_sensor), "humidity_sensor");
|
||||||
|
ESP_LOGD("test", "Humidity sensor stopped");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
id(create_count) += 2;
|
||||||
|
ESP_LOGI("test", "Phase 2 complete");
|
||||||
|
|
||||||
|
- id: test_communication_patterns
|
||||||
|
then:
|
||||||
|
- lambda: |-
|
||||||
|
// Simulate communication patterns (WiFi/API reconnects, etc)
|
||||||
|
ESP_LOGI("test", "Phase 3: Simulating communication patterns");
|
||||||
|
auto *component = id(test_sensor);
|
||||||
|
|
||||||
|
// Connection timeout pattern
|
||||||
|
App.scheduler.set_timeout(component, "connect_timeout", 200, []() {
|
||||||
|
ESP_LOGD("test", "Connection timeout - would retry");
|
||||||
|
id(create_count)++;
|
||||||
|
|
||||||
|
// Schedule retry
|
||||||
|
App.scheduler.set_timeout(id(test_sensor), "connect_retry", 100, []() {
|
||||||
|
ESP_LOGD("test", "Retrying connection");
|
||||||
|
id(create_count)++;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// Heartbeat pattern
|
||||||
|
App.scheduler.set_interval(component, "heartbeat", 50, []() {
|
||||||
|
ESP_LOGD("test", "Heartbeat");
|
||||||
|
id(interval_counter)++;
|
||||||
|
if (id(interval_counter) >= 10) {
|
||||||
|
App.scheduler.cancel_interval(id(test_sensor), "heartbeat");
|
||||||
|
ESP_LOGD("test", "Heartbeat stopped");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
id(create_count) += 2;
|
||||||
|
ESP_LOGI("test", "Phase 3 complete");
|
||||||
|
|
||||||
|
- id: test_defer_patterns
|
||||||
|
then:
|
||||||
|
- lambda: |-
|
||||||
|
// Simulate defer patterns (state changes, async operations)
|
||||||
|
ESP_LOGI("test", "Phase 4: Simulating heavy defer patterns like ratgdo");
|
||||||
|
|
||||||
|
auto *component = id(test_sensor);
|
||||||
|
|
||||||
|
// Simulate a burst of defer operations like ratgdo does with state updates
|
||||||
|
// These should execute immediately and recycle quickly to the pool
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
std::string defer_name = "defer_" + std::to_string(i);
|
||||||
|
App.scheduler.set_timeout(component, defer_name, 0, [i]() {
|
||||||
|
ESP_LOGD("test", "Defer %d executed", i);
|
||||||
|
// Force a small delay between defer executions to see recycling
|
||||||
|
if (i == 5) {
|
||||||
|
ESP_LOGI("test", "Half of defers executed, checking pool status");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
id(create_count) += 10;
|
||||||
|
ESP_LOGD("test", "Created 10 defer operations (0ms timeouts)");
|
||||||
|
|
||||||
|
// Also create some named defers that might get replaced
|
||||||
|
App.scheduler.set_timeout(component, "state_update", 0, []() {
|
||||||
|
ESP_LOGD("test", "State update 1");
|
||||||
|
});
|
||||||
|
|
||||||
|
// Replace the same named defer (should cancel previous)
|
||||||
|
App.scheduler.set_timeout(component, "state_update", 0, []() {
|
||||||
|
ESP_LOGD("test", "State update 2 (replaced)");
|
||||||
|
});
|
||||||
|
|
||||||
|
id(create_count) += 2;
|
||||||
|
id(cancel_count) += 1; // One cancelled due to replacement
|
||||||
|
|
||||||
|
ESP_LOGI("test", "Phase 4 complete");
|
||||||
|
|
||||||
|
- id: test_pool_reuse_verification
|
||||||
|
then:
|
||||||
|
- lambda: |-
|
||||||
|
ESP_LOGI("test", "Phase 5: Verifying pool reuse after everything settles");
|
||||||
|
|
||||||
|
// Cancel any remaining intervals
|
||||||
|
auto *component = id(test_sensor);
|
||||||
|
App.scheduler.cancel_interval(component, "temp_sensor");
|
||||||
|
App.scheduler.cancel_interval(component, "humidity_sensor");
|
||||||
|
App.scheduler.cancel_interval(component, "heartbeat");
|
||||||
|
|
||||||
|
ESP_LOGD("test", "Cancelled any remaining intervals");
|
||||||
|
|
||||||
|
// The pool should have items from completed timeouts in earlier phases.
|
||||||
|
// Phase 1 had 3 timeouts that completed and were recycled.
|
||||||
|
// Phase 3 had 1 timeout that completed and was recycled.
|
||||||
|
// Phase 4 had 3 defers that completed and were recycled.
|
||||||
|
// So we should have a decent pool size already from naturally completed items.
|
||||||
|
|
||||||
|
// Now create 8 new timeouts - they should reuse from pool when available
|
||||||
|
int reuse_test_count = 8;
|
||||||
|
|
||||||
|
for (int i = 0; i < reuse_test_count; i++) {
|
||||||
|
std::string name = "reuse_test_" + std::to_string(i);
|
||||||
|
App.scheduler.set_timeout(component, name, 10 + i * 5, [i]() {
|
||||||
|
ESP_LOGD("test", "Reuse test %d completed", i);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
ESP_LOGI("test", "Created %d items for reuse verification", reuse_test_count);
|
||||||
|
id(create_count) += reuse_test_count;
|
||||||
|
ESP_LOGI("test", "Phase 5 complete");
|
||||||
|
|
||||||
|
- id: test_full_pool_reuse
|
||||||
|
then:
|
||||||
|
- lambda: |-
|
||||||
|
ESP_LOGI("test", "Phase 6: Testing full pool reuse after Phase 5 items complete");
|
||||||
|
|
||||||
|
// At this point, all Phase 5 timeouts should have completed and been recycled.
|
||||||
|
// The pool should be at or near its maximum size (10).
|
||||||
|
// Creating 10 new items should reuse all from the pool.
|
||||||
|
|
||||||
|
auto *component = id(test_sensor);
|
||||||
|
int full_reuse_count = 10;
|
||||||
|
|
||||||
|
for (int i = 0; i < full_reuse_count; i++) {
|
||||||
|
std::string name = "full_reuse_" + std::to_string(i);
|
||||||
|
App.scheduler.set_timeout(component, name, 10 + i * 5, [i]() {
|
||||||
|
ESP_LOGD("test", "Full reuse test %d completed", i);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
ESP_LOGI("test", "Created %d items for full pool reuse verification", full_reuse_count);
|
||||||
|
id(create_count) += full_reuse_count;
|
||||||
|
ESP_LOGI("test", "Phase 6 complete");
|
||||||
|
|
||||||
|
- id: complete_test
|
||||||
|
then:
|
||||||
|
- lambda: |-
|
||||||
|
ESP_LOGI("test", "Pool recycling test complete - created %d items, cancelled %d, intervals %d",
|
||||||
|
id(create_count), id(cancel_count), id(interval_counter));
|
||||||
|
|
||||||
|
sensor:
|
||||||
|
- platform: template
|
||||||
|
name: Test Sensor
|
||||||
|
id: test_sensor
|
||||||
|
lambda: return 1.0;
|
||||||
|
update_interval: never
|
||||||
|
|
||||||
|
# No interval - tests will be triggered from Python via API services
|
201
tests/integration/test_scheduler_pool.py
Normal file
201
tests/integration/test_scheduler_pool.py
Normal file
@@ -0,0 +1,201 @@
|
|||||||
|
"""Integration test for scheduler memory pool functionality."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import re
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from .types import APIClientConnectedFactory, RunCompiledFunction
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_scheduler_pool(
|
||||||
|
yaml_config: str,
|
||||||
|
run_compiled: RunCompiledFunction,
|
||||||
|
api_client_connected: APIClientConnectedFactory,
|
||||||
|
) -> None:
|
||||||
|
"""Test that the scheduler memory pool is working correctly with realistic usage.
|
||||||
|
|
||||||
|
This test simulates real-world scheduler usage patterns and verifies that:
|
||||||
|
1. Items are recycled to the pool when timeouts complete naturally
|
||||||
|
2. Items are recycled when intervals/timeouts are cancelled
|
||||||
|
3. Items are reused from the pool for new scheduler operations
|
||||||
|
4. The pool grows gradually based on actual usage patterns
|
||||||
|
5. Pool operations are logged correctly with debug scheduler enabled
|
||||||
|
"""
|
||||||
|
# Track log messages to verify pool behavior
|
||||||
|
log_lines: list[str] = []
|
||||||
|
pool_reuse_count = 0
|
||||||
|
pool_recycle_count = 0
|
||||||
|
pool_full_count = 0
|
||||||
|
new_alloc_count = 0
|
||||||
|
|
||||||
|
# Patterns to match pool operations
|
||||||
|
reuse_pattern = re.compile(r"Reused item from pool \(pool size now: (\d+)\)")
|
||||||
|
recycle_pattern = re.compile(r"Recycled item to pool \(pool size now: (\d+)\)")
|
||||||
|
pool_full_pattern = re.compile(r"Pool full \(size: (\d+)\), deleting item")
|
||||||
|
new_alloc_pattern = re.compile(r"Allocated new item \(pool empty\)")
|
||||||
|
|
||||||
|
# Futures to track when test phases complete
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
test_complete_future: asyncio.Future[bool] = loop.create_future()
|
||||||
|
phase_futures = {
|
||||||
|
1: loop.create_future(),
|
||||||
|
2: loop.create_future(),
|
||||||
|
3: loop.create_future(),
|
||||||
|
4: loop.create_future(),
|
||||||
|
5: loop.create_future(),
|
||||||
|
6: loop.create_future(),
|
||||||
|
}
|
||||||
|
|
||||||
|
def check_output(line: str) -> None:
|
||||||
|
"""Check log output for pool operations and phase completion."""
|
||||||
|
nonlocal pool_reuse_count, pool_recycle_count, pool_full_count, new_alloc_count
|
||||||
|
log_lines.append(line)
|
||||||
|
|
||||||
|
# Track pool operations
|
||||||
|
if reuse_pattern.search(line):
|
||||||
|
pool_reuse_count += 1
|
||||||
|
|
||||||
|
elif recycle_pattern.search(line):
|
||||||
|
pool_recycle_count += 1
|
||||||
|
|
||||||
|
elif pool_full_pattern.search(line):
|
||||||
|
pool_full_count += 1
|
||||||
|
|
||||||
|
elif new_alloc_pattern.search(line):
|
||||||
|
new_alloc_count += 1
|
||||||
|
|
||||||
|
# Track phase completion
|
||||||
|
for phase_num in range(1, 7):
|
||||||
|
if (
|
||||||
|
f"Phase {phase_num} complete" in line
|
||||||
|
and not phase_futures[phase_num].done()
|
||||||
|
):
|
||||||
|
phase_futures[phase_num].set_result(True)
|
||||||
|
|
||||||
|
# Check for test completion
|
||||||
|
if "Pool recycling test complete" in line and not test_complete_future.done():
|
||||||
|
test_complete_future.set_result(True)
|
||||||
|
|
||||||
|
# Run the test with log monitoring
|
||||||
|
async with (
|
||||||
|
run_compiled(yaml_config, line_callback=check_output),
|
||||||
|
api_client_connected() as client,
|
||||||
|
):
|
||||||
|
# Verify device is running
|
||||||
|
device_info = await client.device_info()
|
||||||
|
assert device_info is not None
|
||||||
|
assert device_info.name == "scheduler-pool-test"
|
||||||
|
|
||||||
|
# Get list of services
|
||||||
|
entities, services = await client.list_entities_services()
|
||||||
|
service_names = {s.name for s in services}
|
||||||
|
|
||||||
|
# Verify all test services are available
|
||||||
|
expected_services = {
|
||||||
|
"run_phase_1",
|
||||||
|
"run_phase_2",
|
||||||
|
"run_phase_3",
|
||||||
|
"run_phase_4",
|
||||||
|
"run_phase_5",
|
||||||
|
"run_phase_6",
|
||||||
|
"run_complete",
|
||||||
|
}
|
||||||
|
assert expected_services.issubset(service_names), (
|
||||||
|
f"Missing services: {expected_services - service_names}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get service objects
|
||||||
|
phase_services = {
|
||||||
|
num: next(s for s in services if s.name == f"run_phase_{num}")
|
||||||
|
for num in range(1, 7)
|
||||||
|
}
|
||||||
|
complete_service = next(s for s in services if s.name == "run_complete")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Phase 1: Component lifecycle
|
||||||
|
client.execute_service(phase_services[1], {})
|
||||||
|
await asyncio.wait_for(phase_futures[1], timeout=1.0)
|
||||||
|
await asyncio.sleep(0.05) # Let timeouts complete
|
||||||
|
|
||||||
|
# Phase 2: Sensor polling
|
||||||
|
client.execute_service(phase_services[2], {})
|
||||||
|
await asyncio.wait_for(phase_futures[2], timeout=1.0)
|
||||||
|
await asyncio.sleep(0.1) # Let intervals run a bit
|
||||||
|
|
||||||
|
# Phase 3: Communication patterns
|
||||||
|
client.execute_service(phase_services[3], {})
|
||||||
|
await asyncio.wait_for(phase_futures[3], timeout=1.0)
|
||||||
|
await asyncio.sleep(0.1) # Let heartbeat run
|
||||||
|
|
||||||
|
# Phase 4: Defer patterns
|
||||||
|
client.execute_service(phase_services[4], {})
|
||||||
|
await asyncio.wait_for(phase_futures[4], timeout=1.0)
|
||||||
|
await asyncio.sleep(0.2) # Let everything settle and recycle
|
||||||
|
|
||||||
|
# Phase 5: Pool reuse verification
|
||||||
|
client.execute_service(phase_services[5], {})
|
||||||
|
await asyncio.wait_for(phase_futures[5], timeout=1.0)
|
||||||
|
await asyncio.sleep(0.1) # Let Phase 5 timeouts complete and recycle
|
||||||
|
|
||||||
|
# Phase 6: Full pool reuse verification
|
||||||
|
client.execute_service(phase_services[6], {})
|
||||||
|
await asyncio.wait_for(phase_futures[6], timeout=1.0)
|
||||||
|
await asyncio.sleep(0.1) # Let Phase 6 timeouts complete
|
||||||
|
|
||||||
|
# Complete test
|
||||||
|
client.execute_service(complete_service, {})
|
||||||
|
await asyncio.wait_for(test_complete_future, timeout=0.5)
|
||||||
|
|
||||||
|
except TimeoutError as e:
|
||||||
|
# Print debug info if test times out
|
||||||
|
recent_logs = "\n".join(log_lines[-30:])
|
||||||
|
phases_completed = [num for num, fut in phase_futures.items() if fut.done()]
|
||||||
|
pytest.fail(
|
||||||
|
f"Test timed out waiting for phase/completion. Error: {e}\n"
|
||||||
|
f" Phases completed: {phases_completed}\n"
|
||||||
|
f" Pool stats:\n"
|
||||||
|
f" Reuse count: {pool_reuse_count}\n"
|
||||||
|
f" Recycle count: {pool_recycle_count}\n"
|
||||||
|
f" Pool full count: {pool_full_count}\n"
|
||||||
|
f" New alloc count: {new_alloc_count}\n"
|
||||||
|
f"Recent logs:\n{recent_logs}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify all test phases ran
|
||||||
|
for phase_num in range(1, 7):
|
||||||
|
assert phase_futures[phase_num].done(), f"Phase {phase_num} did not complete"
|
||||||
|
|
||||||
|
# Verify pool behavior
|
||||||
|
assert pool_recycle_count > 0, "Should have recycled items to pool"
|
||||||
|
|
||||||
|
# Check pool metrics
|
||||||
|
if pool_recycle_count > 0:
|
||||||
|
max_pool_size = 0
|
||||||
|
for line in log_lines:
|
||||||
|
if match := recycle_pattern.search(line):
|
||||||
|
size = int(match.group(1))
|
||||||
|
max_pool_size = max(max_pool_size, size)
|
||||||
|
|
||||||
|
# Pool can grow up to its maximum of 10
|
||||||
|
assert max_pool_size <= 10, f"Pool grew beyond maximum ({max_pool_size})"
|
||||||
|
|
||||||
|
# Log summary for debugging
|
||||||
|
print("\nScheduler Pool Test Summary (Python Orchestrated):")
|
||||||
|
print(f" Items recycled to pool: {pool_recycle_count}")
|
||||||
|
print(f" Items reused from pool: {pool_reuse_count}")
|
||||||
|
print(f" Pool full events: {pool_full_count}")
|
||||||
|
print(f" New allocations: {new_alloc_count}")
|
||||||
|
print(" All phases completed successfully")
|
||||||
|
|
||||||
|
# Verify reuse happened
|
||||||
|
if pool_reuse_count == 0 and pool_recycle_count > 3:
|
||||||
|
pytest.fail("Pool had items recycled but none were reused")
|
||||||
|
|
||||||
|
# Success - pool is working
|
||||||
|
assert pool_recycle_count > 0 or new_alloc_count < 15, (
|
||||||
|
"Pool should either recycle items or limit new allocations"
|
||||||
|
)
|
Reference in New Issue
Block a user