1
0
mirror of https://github.com/esphome/esphome.git synced 2025-11-18 07:45:56 +00:00

Merge branch 'action_chaining' into integration

This commit is contained in:
J. Nick Koston
2025-11-02 17:58:56 -06:00
23 changed files with 1274 additions and 66 deletions

View File

@@ -96,7 +96,11 @@ void loop_task(void *pv_params) {
extern "C" void app_main() { extern "C" void app_main() {
esp32::setup_preferences(); esp32::setup_preferences();
#if CONFIG_FREERTOS_UNICORE
xTaskCreate(loop_task, "loopTask", 8192, nullptr, 1, &loop_task_handle); xTaskCreate(loop_task, "loopTask", 8192, nullptr, 1, &loop_task_handle);
#else
xTaskCreatePinnedToCore(loop_task, "loopTask", 8192, nullptr, 1, &loop_task_handle, 1);
#endif
} }
#endif // USE_ESP_IDF #endif // USE_ESP_IDF

View File

@@ -39,6 +39,7 @@ CONFIG_SCHEMA = (
# due to hardware limitations or lack of reliable interrupt support. This ensures # due to hardware limitations or lack of reliable interrupt support. This ensures
# stable operation on these platforms. Future maintainers should verify platform # stable operation on these platforms. Future maintainers should verify platform
# capabilities before changing this default behavior. # capabilities before changing this default behavior.
# nrf52 has no gpio interrupts implemented yet
cv.SplitDefault( cv.SplitDefault(
CONF_USE_INTERRUPT, CONF_USE_INTERRUPT,
bk72xx=False, bk72xx=False,
@@ -46,7 +47,7 @@ CONFIG_SCHEMA = (
esp8266=True, esp8266=True,
host=True, host=True,
ln882x=False, ln882x=False,
nrf52=True, nrf52=False,
rp2040=True, rp2040=True,
rtl87xx=False, rtl87xx=False,
): cv.boolean, ): cv.boolean,

View File

@@ -55,6 +55,7 @@ CONFIG_SCHEMA = cv.Schema(
esp32=False, esp32=False,
rp2040=False, rp2040=False,
bk72xx=False, bk72xx=False,
host=False,
): cv.All( ): cv.All(
cv.boolean, cv.boolean,
cv.Any( cv.Any(
@@ -64,6 +65,7 @@ CONFIG_SCHEMA = cv.Schema(
esp8266_arduino=cv.Version(0, 0, 0), esp8266_arduino=cv.Version(0, 0, 0),
rp2040_arduino=cv.Version(0, 0, 0), rp2040_arduino=cv.Version(0, 0, 0),
bk72xx_arduino=cv.Version(1, 7, 0), bk72xx_arduino=cv.Version(1, 7, 0),
host=cv.Version(0, 0, 0),
), ),
cv.boolean_false, cv.boolean_false,
), ),

View File

@@ -323,6 +323,8 @@ void Nextion::loop() {
this->set_touch_sleep_timeout(this->touch_sleep_timeout_); this->set_touch_sleep_timeout(this->touch_sleep_timeout_);
} }
this->set_auto_wake_on_touch(this->connection_state_.auto_wake_on_touch_);
this->connection_state_.ignore_is_setup_ = false; this->connection_state_.ignore_is_setup_ = false;
} }

View File

@@ -290,6 +290,7 @@ def show_logs(config: ConfigType, args, devices: list[str]) -> bool:
address = ble_device.address address = ble_device.address
else: else:
return True return True
if is_mac_address(address): if is_mac_address(address):
asyncio.run(logger_connect(address)) asyncio.run(logger_connect(address))
return True return True

View File

@@ -33,19 +33,13 @@ Message Format:
class ABBWelcomeData { class ABBWelcomeData {
public: public:
// Make default // Make default
ABBWelcomeData() { ABBWelcomeData() : data_{0x55, 0xff} {}
std::fill(std::begin(this->data_), std::end(this->data_), 0);
this->data_[0] = 0x55;
this->data_[1] = 0xff;
}
// Make from initializer_list // Make from initializer_list
ABBWelcomeData(std::initializer_list<uint8_t> data) { ABBWelcomeData(std::initializer_list<uint8_t> data) : data_{} {
std::fill(std::begin(this->data_), std::end(this->data_), 0);
std::copy_n(data.begin(), std::min(data.size(), this->data_.size()), this->data_.begin()); std::copy_n(data.begin(), std::min(data.size(), this->data_.size()), this->data_.begin());
} }
// Make from vector // Make from vector
ABBWelcomeData(const std::vector<uint8_t> &data) { ABBWelcomeData(const std::vector<uint8_t> &data) : data_{} {
std::fill(std::begin(this->data_), std::end(this->data_), 0);
std::copy_n(data.begin(), std::min(data.size(), this->data_.size()), this->data_.begin()); std::copy_n(data.begin(), std::min(data.size(), this->data_.size()), this->data_.begin());
} }
// Default copy constructor // Default copy constructor

View File

@@ -2,6 +2,7 @@
#include <memory> #include <memory>
#include <tuple> #include <tuple>
#include <forward_list>
#include "esphome/core/automation.h" #include "esphome/core/automation.h"
#include "esphome/core/component.h" #include "esphome/core/component.h"
#include "esphome/core/helpers.h" #include "esphome/core/helpers.h"
@@ -264,10 +265,22 @@ template<class C, typename... Ts> class IsRunningCondition : public Condition<Ts
C *parent_; C *parent_;
}; };
/** Wait for a script to finish before continuing.
*
* Uses queue-based storage to safely handle concurrent executions.
* While concurrent execution from the same trigger is uncommon, it's possible
* (e.g., rapid button presses, high-frequency sensor updates), so we use
* queue-based storage for correctness.
*/
template<class C, typename... Ts> class ScriptWaitAction : public Action<Ts...>, public Component { template<class C, typename... Ts> class ScriptWaitAction : public Action<Ts...>, public Component {
public: public:
ScriptWaitAction(C *script) : script_(script) {} ScriptWaitAction(C *script) : script_(script) {}
void setup() override {
// Start with loop disabled - only enable when there's work to do
this->disable_loop();
}
void play_complex(Ts... x) override { void play_complex(Ts... x) override {
this->num_running_++; this->num_running_++;
// Check if we can continue immediately. // Check if we can continue immediately.
@@ -275,7 +288,11 @@ template<class C, typename... Ts> class ScriptWaitAction : public Action<Ts...>,
this->play_next_(x...); this->play_next_(x...);
return; return;
} }
this->var_ = std::make_tuple(x...);
// Store parameters for later execution
this->param_queue_.emplace_front(x...);
// Enable loop now that we have work to do
this->enable_loop();
this->loop(); this->loop();
} }
@@ -286,15 +303,30 @@ template<class C, typename... Ts> class ScriptWaitAction : public Action<Ts...>,
if (this->script_->is_running()) if (this->script_->is_running())
return; return;
this->play_next_tuple_(this->var_); while (!this->param_queue_.empty()) {
auto &params = this->param_queue_.front();
this->play_next_tuple_(params, typename gens<sizeof...(Ts)>::type());
this->param_queue_.pop_front();
}
// Queue is now empty - disable loop until next play_complex
this->disable_loop();
} }
void play(Ts... x) override { /* ignore - see play_complex */ void play(Ts... x) override { /* ignore - see play_complex */
} }
void stop() override {
this->param_queue_.clear();
this->disable_loop();
}
protected: protected:
template<int... S> void play_next_tuple_(const std::tuple<Ts...> &tuple, seq<S...> /*unused*/) {
this->play_next_(std::get<S>(tuple)...);
}
C *script_; C *script_;
std::tuple<Ts...> var_{}; std::forward_list<std::tuple<Ts...>> param_queue_;
}; };
} // namespace script } // namespace script

View File

@@ -8,8 +8,8 @@ namespace zephyr {
static const char *const TAG = "zephyr"; static const char *const TAG = "zephyr";
static int flags_to_mode(gpio::Flags flags, bool inverted, bool value) { static gpio_flags_t flags_to_mode(gpio::Flags flags, bool inverted, bool value) {
int ret = 0; gpio_flags_t ret = 0;
if (flags & gpio::FLAG_INPUT) { if (flags & gpio::FLAG_INPUT) {
ret |= GPIO_INPUT; ret |= GPIO_INPUT;
} }
@@ -79,7 +79,10 @@ void ZephyrGPIOPin::pin_mode(gpio::Flags flags) {
if (nullptr == this->gpio_) { if (nullptr == this->gpio_) {
return; return;
} }
gpio_pin_configure(this->gpio_, this->pin_ % 32, flags_to_mode(flags, this->inverted_, this->value_)); auto ret = gpio_pin_configure(this->gpio_, this->pin_ % 32, flags_to_mode(flags, this->inverted_, this->value_));
if (ret != 0) {
ESP_LOGE(TAG, "gpio %u cannot be configured %d.", this->pin_, ret);
}
} }
std::string ZephyrGPIOPin::dump_summary() const { std::string ZephyrGPIOPin::dump_summary() const {

View File

@@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import Callable
from contextlib import contextmanager, suppress from contextlib import contextmanager, suppress
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
@@ -18,6 +19,7 @@ import logging
from pathlib import Path from pathlib import Path
import re import re
from string import ascii_letters, digits from string import ascii_letters, digits
import typing
import uuid as uuid_ import uuid as uuid_
import voluptuous as vol import voluptuous as vol
@@ -1763,16 +1765,37 @@ class SplitDefault(Optional):
class OnlyWith(Optional): class OnlyWith(Optional):
"""Set the default value only if the given component is loaded.""" """Set the default value only if the given component(s) is/are loaded.
def __init__(self, key, component, default=None): This validator allows configuration keys to have defaults that are only applied
when specific component(s) are loaded. Supports both single component names and
lists of components.
Args:
key: Configuration key
component: Single component name (str) or list of component names.
For lists, ALL components must be loaded for the default to apply.
default: Default value to use when condition is met
Example:
# Single component
cv.OnlyWith(CONF_MQTT_ID, "mqtt"): cv.declare_id(MQTTComponent)
# Multiple components (all must be loaded)
cv.OnlyWith(CONF_ZIGBEE_ID, ["zigbee", "nrf52"]): cv.use_id(Zigbee)
"""
def __init__(self, key, component: str | list[str], default=None) -> None:
super().__init__(key) super().__init__(key)
self._component = component self._component = component
self._default = vol.default_factory(default) self._default = vol.default_factory(default)
@property @property
def default(self): def default(self) -> Callable[[], typing.Any] | vol.Undefined:
if self._component in CORE.loaded_integrations: if isinstance(self._component, list):
if all(c in CORE.loaded_integrations for c in self._component):
return self._default
elif self._component in CORE.loaded_integrations:
return self._default return self._default
return vol.UNDEFINED return vol.UNDEFINED

View File

@@ -576,7 +576,9 @@ void Application::yield_with_select_(uint32_t delay_ms) {
// Update fd_set if socket list has changed // Update fd_set if socket list has changed
if (this->socket_fds_changed_) { if (this->socket_fds_changed_) {
FD_ZERO(&this->base_read_fds_); FD_ZERO(&this->base_read_fds_);
// fd bounds are already validated in register_socket_fd() // fd bounds are already validated in register_socket_fd() or guaranteed by platform design:
// - ESP32: LwIP guarantees fd < FD_SETSIZE by design (LWIP_SOCKET_OFFSET = FD_SETSIZE - CONFIG_LWIP_MAX_SOCKETS)
// - Other platforms: register_socket_fd() validates fd < FD_SETSIZE
for (int fd : this->socket_fds_) { for (int fd : this->socket_fds_) {
FD_SET(fd, &this->base_read_fds_); FD_SET(fd, &this->base_read_fds_);
} }

View File

@@ -220,6 +220,7 @@ template<typename... Ts> class Action {
protected: protected:
friend ActionList<Ts...>; friend ActionList<Ts...>;
template<typename... Us> friend class ContinuationAction;
virtual void play(Ts... x) = 0; virtual void play(Ts... x) = 0;
void play_next_(Ts... x) { void play_next_(Ts... x) {

View File

@@ -10,6 +10,7 @@
#include "esphome/core/helpers.h" #include "esphome/core/helpers.h"
#include <vector> #include <vector>
#include <forward_list>
namespace esphome { namespace esphome {
@@ -215,18 +216,46 @@ template<typename... Ts> class StatelessLambdaAction : public Action<Ts...> {
void (*f_)(Ts...); void (*f_)(Ts...);
}; };
/// Simple continuation action that calls play_next_ on a parent action.
/// Used internally by IfAction, WhileAction, RepeatAction, etc. to chain actions.
/// Memory: 4-8 bytes (parent pointer) vs 40 bytes (LambdaAction with std::function).
template<typename... Ts> class ContinuationAction : public Action<Ts...> {
public:
explicit ContinuationAction(Action<Ts...> *parent) : parent_(parent) {}
void play(Ts... x) override { this->parent_->play_next_(x...); }
protected:
Action<Ts...> *parent_;
};
// Forward declaration for WhileLoopContinuation
template<typename... Ts> class WhileAction;
/// Loop continuation for WhileAction that checks condition and repeats or continues.
/// Memory: 4-8 bytes (parent pointer) vs 40 bytes (LambdaAction with std::function).
template<typename... Ts> class WhileLoopContinuation : public Action<Ts...> {
public:
explicit WhileLoopContinuation(WhileAction<Ts...> *parent) : parent_(parent) {}
void play(Ts... x) override;
protected:
WhileAction<Ts...> *parent_;
};
template<typename... Ts> class IfAction : public Action<Ts...> { template<typename... Ts> class IfAction : public Action<Ts...> {
public: public:
explicit IfAction(Condition<Ts...> *condition) : condition_(condition) {} explicit IfAction(Condition<Ts...> *condition) : condition_(condition) {}
void add_then(const std::initializer_list<Action<Ts...> *> &actions) { void add_then(const std::initializer_list<Action<Ts...> *> &actions) {
this->then_.add_actions(actions); this->then_.add_actions(actions);
this->then_.add_action(new LambdaAction<Ts...>([this](Ts... x) { this->play_next_(x...); })); this->then_.add_action(new ContinuationAction<Ts...>(this));
} }
void add_else(const std::initializer_list<Action<Ts...> *> &actions) { void add_else(const std::initializer_list<Action<Ts...> *> &actions) {
this->else_.add_actions(actions); this->else_.add_actions(actions);
this->else_.add_action(new LambdaAction<Ts...>([this](Ts... x) { this->play_next_(x...); })); this->else_.add_action(new ContinuationAction<Ts...>(this));
} }
void play_complex(Ts... x) override { void play_complex(Ts... x) override {
@@ -267,33 +296,23 @@ template<typename... Ts> class WhileAction : public Action<Ts...> {
void add_then(const std::initializer_list<Action<Ts...> *> &actions) { void add_then(const std::initializer_list<Action<Ts...> *> &actions) {
this->then_.add_actions(actions); this->then_.add_actions(actions);
this->then_.add_action(new LambdaAction<Ts...>([this](Ts... x) { this->then_.add_action(new WhileLoopContinuation<Ts...>(this));
if (this->num_running_ > 0 && this->condition_->check_tuple(this->var_)) {
// play again
if (this->num_running_ > 0) {
this->then_.play_tuple(this->var_);
}
} else {
// condition false, play next
this->play_next_tuple_(this->var_);
}
}));
} }
friend class WhileLoopContinuation<Ts...>;
void play_complex(Ts... x) override { void play_complex(Ts... x) override {
this->num_running_++; this->num_running_++;
// Store loop parameters
this->var_ = std::make_tuple(x...);
// Initial condition check // Initial condition check
if (!this->condition_->check_tuple(this->var_)) { if (!this->condition_->check(x...)) {
// If new condition check failed, stop loop if running // If new condition check failed, stop loop if running
this->then_.stop(); this->then_.stop();
this->play_next_tuple_(this->var_); this->play_next_(x...);
return; return;
} }
if (this->num_running_ > 0) { if (this->num_running_ > 0) {
this->then_.play_tuple(this->var_); this->then_.play(x...);
} }
} }
@@ -305,7 +324,32 @@ template<typename... Ts> class WhileAction : public Action<Ts...> {
protected: protected:
Condition<Ts...> *condition_; Condition<Ts...> *condition_;
ActionList<Ts...> then_; ActionList<Ts...> then_;
std::tuple<Ts...> var_{}; };
// Implementation of WhileLoopContinuation::play
template<typename... Ts> void WhileLoopContinuation<Ts...>::play(Ts... x) {
if (this->parent_->num_running_ > 0 && this->parent_->condition_->check(x...)) {
// play again
this->parent_->then_.play(x...);
} else {
// condition false, play next
this->parent_->play_next_(x...);
}
}
// Forward declaration for RepeatLoopContinuation
template<typename... Ts> class RepeatAction;
/// Loop continuation for RepeatAction that increments iteration and repeats or continues.
/// Memory: 4-8 bytes (parent pointer) vs 40 bytes (LambdaAction with std::function).
template<typename... Ts> class RepeatLoopContinuation : public Action<uint32_t, Ts...> {
public:
explicit RepeatLoopContinuation(RepeatAction<Ts...> *parent) : parent_(parent) {}
void play(uint32_t iteration, Ts... x) override;
protected:
RepeatAction<Ts...> *parent_;
}; };
template<typename... Ts> class RepeatAction : public Action<Ts...> { template<typename... Ts> class RepeatAction : public Action<Ts...> {
@@ -314,23 +358,17 @@ template<typename... Ts> class RepeatAction : public Action<Ts...> {
void add_then(const std::initializer_list<Action<uint32_t, Ts...> *> &actions) { void add_then(const std::initializer_list<Action<uint32_t, Ts...> *> &actions) {
this->then_.add_actions(actions); this->then_.add_actions(actions);
this->then_.add_action(new LambdaAction<uint32_t, Ts...>([this](uint32_t iteration, Ts... x) { this->then_.add_action(new RepeatLoopContinuation<Ts...>(this));
iteration++;
if (iteration >= this->count_.value(x...)) {
this->play_next_tuple_(this->var_);
} else {
this->then_.play(iteration, x...);
}
}));
} }
friend class RepeatLoopContinuation<Ts...>;
void play_complex(Ts... x) override { void play_complex(Ts... x) override {
this->num_running_++; this->num_running_++;
this->var_ = std::make_tuple(x...);
if (this->count_.value(x...) > 0) { if (this->count_.value(x...) > 0) {
this->then_.play(0, x...); this->then_.play(0, x...);
} else { } else {
this->play_next_tuple_(this->var_); this->play_next_(x...);
} }
} }
@@ -341,15 +379,36 @@ template<typename... Ts> class RepeatAction : public Action<Ts...> {
protected: protected:
ActionList<uint32_t, Ts...> then_; ActionList<uint32_t, Ts...> then_;
std::tuple<Ts...> var_;
}; };
// Implementation of RepeatLoopContinuation::play
template<typename... Ts> void RepeatLoopContinuation<Ts...>::play(uint32_t iteration, Ts... x) {
iteration++;
if (iteration >= this->parent_->count_.value(x...)) {
this->parent_->play_next_(x...);
} else {
this->parent_->then_.play(iteration, x...);
}
}
/** Wait until a condition is true to continue execution.
*
* Uses queue-based storage to safely handle concurrent executions.
* While concurrent execution from the same trigger is uncommon, it's possible
* (e.g., rapid button presses, high-frequency sensor updates), so we use
* queue-based storage for correctness.
*/
template<typename... Ts> class WaitUntilAction : public Action<Ts...>, public Component { template<typename... Ts> class WaitUntilAction : public Action<Ts...>, public Component {
public: public:
WaitUntilAction(Condition<Ts...> *condition) : condition_(condition) {} WaitUntilAction(Condition<Ts...> *condition) : condition_(condition) {}
TEMPLATABLE_VALUE(uint32_t, timeout_value) TEMPLATABLE_VALUE(uint32_t, timeout_value)
void setup() override {
// Start with loop disabled - only enable when there's work to do
this->disable_loop();
}
void play_complex(Ts... x) override { void play_complex(Ts... x) override {
this->num_running_++; this->num_running_++;
// Check if we can continue immediately. // Check if we can continue immediately.
@@ -359,13 +418,14 @@ template<typename... Ts> class WaitUntilAction : public Action<Ts...>, public Co
} }
return; return;
} }
this->var_ = std::make_tuple(x...);
if (this->timeout_value_.has_value()) { // Store for later processing
auto f = std::bind(&WaitUntilAction<Ts...>::play_next_, this, x...); auto now = millis();
this->set_timeout("timeout", this->timeout_value_.value(x...), f); auto timeout = this->timeout_value_.optional_value(x...);
} this->var_queue_.emplace_front(now, timeout, std::make_tuple(x...));
// Enable loop now that we have work to do
this->enable_loop();
this->loop(); this->loop();
} }
@@ -373,13 +433,32 @@ template<typename... Ts> class WaitUntilAction : public Action<Ts...>, public Co
if (this->num_running_ == 0) if (this->num_running_ == 0)
return; return;
if (!this->condition_->check_tuple(this->var_)) { auto now = millis();
return;
this->var_queue_.remove_if([&](auto &queued) {
auto start = std::get<uint32_t>(queued);
auto timeout = std::get<optional<uint32_t>>(queued);
auto &var = std::get<std::tuple<Ts...>>(queued);
auto expired = timeout && (now - start) >= *timeout;
if (!expired && !this->condition_->check_tuple(var)) {
return false;
} }
this->cancel_timeout("timeout"); this->play_next_tuple_(var);
return true;
});
this->play_next_tuple_(this->var_); // If queue is now empty, disable loop until next play_complex
if (this->var_queue_.empty()) {
this->disable_loop();
}
}
void stop() override {
this->var_queue_.clear();
this->disable_loop();
} }
float get_setup_priority() const override { return setup_priority::DATA; } float get_setup_priority() const override { return setup_priority::DATA; }
@@ -387,11 +466,9 @@ template<typename... Ts> class WaitUntilAction : public Action<Ts...>, public Co
void play(Ts... x) override { /* ignore - see play_complex */ void play(Ts... x) override { /* ignore - see play_complex */
} }
void stop() override { this->cancel_timeout("timeout"); }
protected: protected:
Condition<Ts...> *condition_; Condition<Ts...> *condition_;
std::tuple<Ts...> var_{}; std::forward_list<std::tuple<uint32_t, optional<uint32_t>, std::tuple<Ts...>>> var_queue_{};
}; };
template<typename... Ts> class UpdateComponentAction : public Action<Ts...> { template<typename... Ts> class UpdateComponentAction : public Action<Ts...> {

View File

@@ -87,3 +87,99 @@ api:
- float_arr.size() - float_arr.size()
- string_arr[0].c_str() - string_arr[0].c_str()
- string_arr.size() - string_arr.size()
# Test ContinuationAction (IfAction with then/else branches)
- action: test_if_action
variables:
condition: bool
value: int
then:
- if:
condition:
lambda: 'return condition;'
then:
- logger.log:
format: "Condition true, value: %d"
args: ['value']
else:
- logger.log:
format: "Condition false, value: %d"
args: ['value']
- logger.log: "After if/else"
# Test nested IfAction (multiple ContinuationAction instances)
- action: test_nested_if
variables:
outer: bool
inner: bool
then:
- if:
condition:
lambda: 'return outer;'
then:
- if:
condition:
lambda: 'return inner;'
then:
- logger.log: "Both true"
else:
- logger.log: "Outer true, inner false"
else:
- logger.log: "Outer false"
- logger.log: "After nested if"
# Test WhileLoopContinuation (WhileAction)
- action: test_while_action
variables:
max_count: int
then:
- lambda: 'id(api_continuation_test_counter) = 0;'
- while:
condition:
lambda: 'return id(api_continuation_test_counter) < max_count;'
then:
- logger.log:
format: "While loop iteration: %d"
args: ['id(api_continuation_test_counter)']
- lambda: 'id(api_continuation_test_counter)++;'
- logger.log: "After while loop"
# Test RepeatLoopContinuation (RepeatAction)
- action: test_repeat_action
variables:
count: int
then:
- repeat:
count: !lambda 'return count;'
then:
- logger.log:
format: "Repeat iteration: %d"
args: ['iteration']
- logger.log: "After repeat"
# Test combined continuations (if + while + repeat)
- action: test_combined_continuations
variables:
do_loop: bool
loop_count: int
then:
- if:
condition:
lambda: 'return do_loop;'
then:
- repeat:
count: !lambda 'return loop_count;'
then:
- lambda: 'id(api_continuation_test_counter) = iteration;'
- while:
condition:
lambda: 'return id(api_continuation_test_counter) > 0;'
then:
- logger.log:
format: "Combined: repeat=%d, while=%d"
args: ['iteration', 'id(api_continuation_test_counter)']
- lambda: 'id(api_continuation_test_counter)--;'
else:
- logger.log: "Skipped loops"
- logger.log: "After combined test"
globals:
- id: api_continuation_test_counter
type: int
restore_value: false
initial_value: '0'

View File

@@ -0,0 +1,2 @@
network:
enable_ipv6: true

View File

@@ -0,0 +1,3 @@
nrf52:
# it is not correct bootloader for the board
bootloader: adafruit_nrf52_sd140_v6

View File

@@ -0,0 +1,105 @@
esphome:
name: action-concurrent-reentry
on_boot:
- priority: -100
then:
- repeat:
count: 5
then:
- lambda: id(handler_wait_until)->execute(id(global_counter));
- lambda: id(handler_repeat)->execute(id(global_counter));
- lambda: id(handler_while)->execute(id(global_counter));
- lambda: id(handler_script_wait)->execute(id(global_counter));
- delay: 50ms
- lambda: id(global_counter)++;
- delay: 50ms
host:
api:
globals:
- id: global_counter
type: int
script:
- id: handler_wait_until
mode: parallel
parameters:
arg: int
then:
- wait_until:
condition:
lambda: return id(global_counter) == 5;
- logger.log:
format: "AFTER wait_until ARG %d"
args:
- arg
- id: handler_script_wait
mode: parallel
parameters:
arg: int
then:
- script.wait: handler_wait_until
- logger.log:
format: "AFTER script.wait ARG %d"
args:
- arg
- id: handler_repeat
mode: parallel
parameters:
arg: int
then:
- repeat:
count: 3
then:
- logger.log:
format: "IN repeat %d ARG %d"
args:
- iteration
- arg
- delay: 100ms
- logger.log:
format: "AFTER repeat ARG %d"
args:
- arg
- id: handler_while
mode: parallel
parameters:
arg: int
then:
- while:
condition:
lambda: return id(global_counter) != 5;
then:
- logger.log:
format: "IN while ARG %d"
args:
- arg
- delay: 100ms
- logger.log:
format: "AFTER while ARG %d"
args:
- arg
logger:
level: DEBUG

View File

@@ -0,0 +1,130 @@
esphome:
name: test-automation-wait-actions
host:
api:
actions:
# Test 1: Trigger wait_until automation 5 times rapidly
- action: test_wait_until
then:
- logger.log: "=== TEST 1: Triggering wait_until automation 5 times ==="
# Publish 5 different values to trigger the on_value automation 5 times
- sensor.template.publish:
id: wait_until_sensor
state: 1
- sensor.template.publish:
id: wait_until_sensor
state: 2
- sensor.template.publish:
id: wait_until_sensor
state: 3
- sensor.template.publish:
id: wait_until_sensor
state: 4
- sensor.template.publish:
id: wait_until_sensor
state: 5
# Wait then satisfy the condition so all 5 waiting actions complete
- delay: 100ms
- globals.set:
id: test_flag
value: 'true'
# Test 2: Trigger script.wait automation 5 times rapidly
- action: test_script_wait
then:
- logger.log: "=== TEST 2: Triggering script.wait automation 5 times ==="
# Start a long-running script
- script.execute: blocking_script
# Publish 5 different values to trigger the on_value automation 5 times
- sensor.template.publish:
id: script_wait_sensor
state: 1
- sensor.template.publish:
id: script_wait_sensor
state: 2
- sensor.template.publish:
id: script_wait_sensor
state: 3
- sensor.template.publish:
id: script_wait_sensor
state: 4
- sensor.template.publish:
id: script_wait_sensor
state: 5
# Test 3: Trigger wait_until timeout automation 5 times rapidly
- action: test_wait_timeout
then:
- logger.log: "=== TEST 3: Triggering timeout automation 5 times ==="
# Publish 5 different values (condition will never be true, all will timeout)
- sensor.template.publish:
id: timeout_sensor
state: 1
- sensor.template.publish:
id: timeout_sensor
state: 2
- sensor.template.publish:
id: timeout_sensor
state: 3
- sensor.template.publish:
id: timeout_sensor
state: 4
- sensor.template.publish:
id: timeout_sensor
state: 5
logger:
level: DEBUG
globals:
- id: test_flag
type: bool
restore_value: false
initial_value: 'false'
- id: timeout_flag
type: bool
restore_value: false
initial_value: 'false'
# Sensors with wait_until/script.wait in their on_value automations
sensor:
# Test 1: on_value automation with wait_until
- platform: template
id: wait_until_sensor
on_value:
# This wait_until will be hit 5 times before any complete
- wait_until:
condition:
lambda: return id(test_flag);
- logger.log: "wait_until automation completed"
# Test 2: on_value automation with script.wait
- platform: template
id: script_wait_sensor
on_value:
# This script.wait will be hit 5 times before any complete
- script.wait: blocking_script
- logger.log: "script.wait automation completed"
# Test 3: on_value automation with wait_until timeout
- platform: template
id: timeout_sensor
on_value:
# This wait_until will be hit 5 times, all will timeout
- wait_until:
condition:
lambda: return id(timeout_flag);
timeout: 200ms
- logger.log: "timeout automation completed"
script:
# Blocking script for script.wait test
- id: blocking_script
mode: single
then:
- logger.log: "Blocking script: START"
- delay: 200ms
- logger.log: "Blocking script: END"

View File

@@ -0,0 +1,174 @@
esphome:
name: test-continuation-actions
host:
api:
actions:
# Test 1: IfAction with ContinuationAction (then/else branches)
- action: test_if_action
variables:
condition: bool
value: int
then:
- logger.log:
format: "Test if: condition=%s, value=%d"
args: ['YESNO(condition)', 'value']
- if:
condition:
lambda: 'return condition;'
then:
- logger.log:
format: "if-then executed: value=%d"
args: ['value']
else:
- logger.log:
format: "if-else executed: value=%d"
args: ['value']
- logger.log: "if completed"
# Test 2: Nested IfAction (multiple ContinuationAction instances)
- action: test_nested_if
variables:
outer: bool
inner: bool
then:
- logger.log:
format: "Test nested if: outer=%s, inner=%s"
args: ['YESNO(outer)', 'YESNO(inner)']
- if:
condition:
lambda: 'return outer;'
then:
- if:
condition:
lambda: 'return inner;'
then:
- logger.log: "nested-both-true"
else:
- logger.log: "nested-outer-true-inner-false"
else:
- logger.log: "nested-outer-false"
- logger.log: "nested if completed"
# Test 3: WhileAction with WhileLoopContinuation
- action: test_while_action
variables:
max_count: int
then:
- logger.log:
format: "Test while: max_count=%d"
args: ['max_count']
- globals.set:
id: continuation_test_counter
value: !lambda 'return 0;'
- while:
condition:
lambda: 'return id(continuation_test_counter) < max_count;'
then:
- logger.log:
format: "while-iteration-%d"
args: ['id(continuation_test_counter)']
- globals.set:
id: continuation_test_counter
value: !lambda 'return id(continuation_test_counter) + 1;'
- logger.log: "while completed"
# Test 4: RepeatAction with RepeatLoopContinuation
- action: test_repeat_action
variables:
count: int
then:
- logger.log:
format: "Test repeat: count=%d"
args: ['count']
- repeat:
count: !lambda 'return count;'
then:
- logger.log:
format: "repeat-iteration-%d"
args: ['iteration']
- logger.log: "repeat completed"
# Test 5: Combined continuations (if + while + repeat)
- action: test_combined
variables:
do_loop: bool
loop_count: int
then:
- logger.log:
format: "Test combined: do_loop=%s, loop_count=%d"
args: ['YESNO(do_loop)', 'loop_count']
- if:
condition:
lambda: 'return do_loop;'
then:
- repeat:
count: !lambda 'return loop_count;'
then:
- globals.set:
id: continuation_test_counter
value: !lambda 'return iteration;'
- while:
condition:
lambda: 'return id(continuation_test_counter) > 0;'
then:
- logger.log:
format: "combined-repeat%d-while%d"
args: ['iteration', 'id(continuation_test_counter)']
- globals.set:
id: continuation_test_counter
value: !lambda 'return id(continuation_test_counter) - 1;'
else:
- logger.log: "combined-skipped"
- logger.log: "combined completed"
# Test 6: Rapid triggers to verify memory efficiency
- action: test_rapid_if
then:
- logger.log: "=== Rapid if test start ==="
- sensor.template.publish:
id: rapid_sensor
state: 1
- sensor.template.publish:
id: rapid_sensor
state: 2
- sensor.template.publish:
id: rapid_sensor
state: 3
- sensor.template.publish:
id: rapid_sensor
state: 4
- sensor.template.publish:
id: rapid_sensor
state: 5
- logger.log: "=== Rapid if test published 5 values ==="
logger:
level: DEBUG
globals:
- id: continuation_test_counter
type: int
restore_value: false
initial_value: '0'
# Sensor to test rapid automation triggers with if/else (ContinuationAction)
sensor:
- platform: template
id: rapid_sensor
on_value:
- if:
condition:
lambda: 'return x > 2;'
then:
- logger.log:
format: "rapid-if-then: value=%d"
args: ['(int)x']
else:
- logger.log:
format: "rapid-if-else: value=%d"
args: ['(int)x']
- logger.log:
format: "rapid-if-completed: value=%d"
args: ['(int)x']

View File

@@ -0,0 +1,92 @@
"""Integration test for API conditional memory optimization with triggers and services."""
from __future__ import annotations
import asyncio
import collections
import re
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_action_concurrent_reentry(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""
This test runs a script in parallel with varying arguments and verifies if
each script keeps its original argument throughout its execution
"""
test_complete = asyncio.Event()
expected = {0, 1, 2, 3, 4}
# Patterns to match in logs
after_wait_until_pattern = re.compile(r"AFTER wait_until ARG (\d+)")
after_script_wait_pattern = re.compile(r"AFTER script\.wait ARG (\d+)")
after_repeat_pattern = re.compile(r"AFTER repeat ARG (\d+)")
in_repeat_pattern = re.compile(r"IN repeat (\d+) ARG (\d+)")
after_while_pattern = re.compile(r"AFTER while ARG (\d+)")
in_while_pattern = re.compile(r"IN while ARG (\d+)")
after_wait_until_args = []
after_script_wait_args = []
after_while_args = []
in_while_args = []
after_repeat_args = []
in_repeat_args = collections.defaultdict(list)
def check_output(line: str) -> None:
"""Check log output for expected messages."""
if test_complete.is_set():
return
if mo := after_wait_until_pattern.search(line):
after_wait_until_args.append(int(mo.group(1)))
elif mo := after_script_wait_pattern.search(line):
after_script_wait_args.append(int(mo.group(1)))
elif mo := in_while_pattern.search(line):
in_while_args.append(int(mo.group(1)))
elif mo := after_while_pattern.search(line):
after_while_args.append(int(mo.group(1)))
elif mo := in_repeat_pattern.search(line):
in_repeat_args[int(mo.group(1))].append(int(mo.group(2)))
elif mo := after_repeat_pattern.search(line):
after_repeat_args.append(int(mo.group(1)))
if len(after_repeat_args) == len(expected):
test_complete.set()
# Run with log monitoring
async with (
run_compiled(yaml_config, line_callback=check_output),
api_client_connected() as client,
):
# Verify device info
device_info = await client.device_info()
assert device_info is not None
assert device_info.name == "action-concurrent-reentry"
# Wait for tests to complete with timeout
try:
await asyncio.wait_for(test_complete.wait(), timeout=8.0)
except TimeoutError:
pytest.fail("test timed out")
# order may change, but all args must be present
for args in in_repeat_args.values():
assert set(args) == expected
assert set(in_repeat_args.keys()) == {0, 1, 2}
assert set(after_wait_until_args) == expected, after_wait_until_args
assert set(after_script_wait_args) == expected, after_script_wait_args
assert set(after_repeat_args) == expected, after_repeat_args
assert set(after_while_args) == expected, after_while_args
assert dict(collections.Counter(in_while_args)) == {
0: 5,
1: 4,
2: 3,
3: 2,
4: 1,
}, in_while_args

View File

@@ -0,0 +1,104 @@
"""Test concurrent execution of wait_until and script.wait in direct automation actions."""
from __future__ import annotations
import asyncio
import re
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_automation_wait_actions(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""
Test that wait_until and script.wait correctly handle concurrent executions
when automation actions (not scripts) are triggered multiple times rapidly.
This tests sensor.on_value automations being triggered 5 times before any complete.
"""
loop = asyncio.get_running_loop()
# Track completion counts
test_results = {
"wait_until": 0,
"script_wait": 0,
"wait_until_timeout": 0,
}
# Patterns for log messages
wait_until_complete = re.compile(r"wait_until automation completed")
script_wait_complete = re.compile(r"script\.wait automation completed")
timeout_complete = re.compile(r"timeout automation completed")
# Test completion futures
test1_complete = loop.create_future()
test2_complete = loop.create_future()
test3_complete = loop.create_future()
def check_output(line: str) -> None:
"""Check log output for completion messages."""
# Test 1: wait_until concurrent execution
if wait_until_complete.search(line):
test_results["wait_until"] += 1
if test_results["wait_until"] == 5 and not test1_complete.done():
test1_complete.set_result(True)
# Test 2: script.wait concurrent execution
if script_wait_complete.search(line):
test_results["script_wait"] += 1
if test_results["script_wait"] == 5 and not test2_complete.done():
test2_complete.set_result(True)
# Test 3: wait_until with timeout
if timeout_complete.search(line):
test_results["wait_until_timeout"] += 1
if test_results["wait_until_timeout"] == 5 and not test3_complete.done():
test3_complete.set_result(True)
async with (
run_compiled(yaml_config, line_callback=check_output),
api_client_connected() as client,
):
# Get services
_, services = await client.list_entities_services()
# Test 1: wait_until in automation - trigger 5 times rapidly
test_service = next((s for s in services if s.name == "test_wait_until"), None)
assert test_service is not None, "test_wait_until service not found"
client.execute_service(test_service, {})
await asyncio.wait_for(test1_complete, timeout=3.0)
# Verify Test 1: All 5 triggers should complete
assert test_results["wait_until"] == 5, (
f"Test 1: Expected 5 wait_until completions, got {test_results['wait_until']}"
)
# Test 2: script.wait in automation - trigger 5 times rapidly
test_service = next((s for s in services if s.name == "test_script_wait"), None)
assert test_service is not None, "test_script_wait service not found"
client.execute_service(test_service, {})
await asyncio.wait_for(test2_complete, timeout=3.0)
# Verify Test 2: All 5 triggers should complete
assert test_results["script_wait"] == 5, (
f"Test 2: Expected 5 script.wait completions, got {test_results['script_wait']}"
)
# Test 3: wait_until with timeout in automation - trigger 5 times rapidly
test_service = next(
(s for s in services if s.name == "test_wait_timeout"), None
)
assert test_service is not None, "test_wait_timeout service not found"
client.execute_service(test_service, {})
await asyncio.wait_for(test3_complete, timeout=3.0)
# Verify Test 3: All 5 triggers should timeout and complete
assert test_results["wait_until_timeout"] == 5, (
f"Test 3: Expected 5 timeout completions, got {test_results['wait_until_timeout']}"
)

View File

@@ -0,0 +1,235 @@
"""Test continuation actions (ContinuationAction, WhileLoopContinuation, RepeatLoopContinuation)."""
from __future__ import annotations
import asyncio
import re
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_continuation_actions(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""
Test that continuation actions work correctly for if/while/repeat.
These continuation classes replace LambdaAction with simple parent pointers,
saving 32-36 bytes per instance and eliminating std::function overhead.
"""
loop = asyncio.get_running_loop()
# Track test completions
test_results = {
"if_then": False,
"if_else": False,
"if_complete": False,
"nested_both_true": False,
"nested_outer_true_inner_false": False,
"nested_outer_false": False,
"nested_complete": False,
"while_iterations": 0,
"while_complete": False,
"repeat_iterations": 0,
"repeat_complete": False,
"combined_iterations": 0,
"combined_complete": False,
"rapid_then": 0,
"rapid_else": 0,
"rapid_complete": 0,
}
# Patterns for log messages
if_then_pattern = re.compile(r"if-then executed: value=(\d+)")
if_else_pattern = re.compile(r"if-else executed: value=(\d+)")
if_complete_pattern = re.compile(r"if completed")
nested_both_true_pattern = re.compile(r"nested-both-true")
nested_outer_true_inner_false_pattern = re.compile(r"nested-outer-true-inner-false")
nested_outer_false_pattern = re.compile(r"nested-outer-false")
nested_complete_pattern = re.compile(r"nested if completed")
while_iteration_pattern = re.compile(r"while-iteration-(\d+)")
while_complete_pattern = re.compile(r"while completed")
repeat_iteration_pattern = re.compile(r"repeat-iteration-(\d+)")
repeat_complete_pattern = re.compile(r"repeat completed")
combined_pattern = re.compile(r"combined-repeat(\d+)-while(\d+)")
combined_complete_pattern = re.compile(r"combined completed")
rapid_then_pattern = re.compile(r"rapid-if-then: value=(\d+)")
rapid_else_pattern = re.compile(r"rapid-if-else: value=(\d+)")
rapid_complete_pattern = re.compile(r"rapid-if-completed: value=(\d+)")
# Test completion futures
test1_complete = loop.create_future() # if action
test2_complete = loop.create_future() # nested if
test3_complete = loop.create_future() # while
test4_complete = loop.create_future() # repeat
test5_complete = loop.create_future() # combined
test6_complete = loop.create_future() # rapid
def check_output(line: str) -> None:
"""Check log output for test messages."""
# Test 1: IfAction
if if_then_pattern.search(line):
test_results["if_then"] = True
if if_else_pattern.search(line):
test_results["if_else"] = True
if if_complete_pattern.search(line):
test_results["if_complete"] = True
if not test1_complete.done():
test1_complete.set_result(True)
# Test 2: Nested IfAction
if nested_both_true_pattern.search(line):
test_results["nested_both_true"] = True
if nested_outer_true_inner_false_pattern.search(line):
test_results["nested_outer_true_inner_false"] = True
if nested_outer_false_pattern.search(line):
test_results["nested_outer_false"] = True
if nested_complete_pattern.search(line):
test_results["nested_complete"] = True
if not test2_complete.done():
test2_complete.set_result(True)
# Test 3: WhileAction
if match := while_iteration_pattern.search(line):
test_results["while_iterations"] = max(
test_results["while_iterations"], int(match.group(1)) + 1
)
if while_complete_pattern.search(line):
test_results["while_complete"] = True
if not test3_complete.done():
test3_complete.set_result(True)
# Test 4: RepeatAction
if match := repeat_iteration_pattern.search(line):
test_results["repeat_iterations"] = max(
test_results["repeat_iterations"], int(match.group(1)) + 1
)
if repeat_complete_pattern.search(line):
test_results["repeat_complete"] = True
if not test4_complete.done():
test4_complete.set_result(True)
# Test 5: Combined
if combined_pattern.search(line):
test_results["combined_iterations"] += 1
if combined_complete_pattern.search(line):
test_results["combined_complete"] = True
if not test5_complete.done():
test5_complete.set_result(True)
# Test 6: Rapid triggers
if rapid_then_pattern.search(line):
test_results["rapid_then"] += 1
if rapid_else_pattern.search(line):
test_results["rapid_else"] += 1
if rapid_complete_pattern.search(line):
test_results["rapid_complete"] += 1
if test_results["rapid_complete"] == 5 and not test6_complete.done():
test6_complete.set_result(True)
async with (
run_compiled(yaml_config, line_callback=check_output),
api_client_connected() as client,
):
# Get services
_, services = await client.list_entities_services()
# Test 1: IfAction with then branch
test_service = next((s for s in services if s.name == "test_if_action"), None)
assert test_service is not None, "test_if_action service not found"
client.execute_service(test_service, {"condition": True, "value": 42})
await asyncio.wait_for(test1_complete, timeout=2.0)
assert test_results["if_then"], "IfAction then branch not executed"
assert test_results["if_complete"], "IfAction did not complete"
# Test 1b: IfAction with else branch
test1_complete = loop.create_future()
test_results["if_complete"] = False
client.execute_service(test_service, {"condition": False, "value": 99})
await asyncio.wait_for(test1_complete, timeout=2.0)
assert test_results["if_else"], "IfAction else branch not executed"
assert test_results["if_complete"], "IfAction did not complete"
# Test 2: Nested IfAction - test all branches
test_service = next((s for s in services if s.name == "test_nested_if"), None)
assert test_service is not None, "test_nested_if service not found"
# Both true
client.execute_service(test_service, {"outer": True, "inner": True})
await asyncio.wait_for(test2_complete, timeout=2.0)
assert test_results["nested_both_true"], "Nested both true not executed"
# Outer true, inner false
test2_complete = loop.create_future()
test_results["nested_complete"] = False
client.execute_service(test_service, {"outer": True, "inner": False})
await asyncio.wait_for(test2_complete, timeout=2.0)
assert test_results["nested_outer_true_inner_false"], (
"Nested outer true inner false not executed"
)
# Outer false
test2_complete = loop.create_future()
test_results["nested_complete"] = False
client.execute_service(test_service, {"outer": False, "inner": True})
await asyncio.wait_for(test2_complete, timeout=2.0)
assert test_results["nested_outer_false"], "Nested outer false not executed"
# Test 3: WhileAction
test_service = next(
(s for s in services if s.name == "test_while_action"), None
)
assert test_service is not None, "test_while_action service not found"
client.execute_service(test_service, {"max_count": 3})
await asyncio.wait_for(test3_complete, timeout=2.0)
assert test_results["while_iterations"] == 3, (
f"WhileAction expected 3 iterations, got {test_results['while_iterations']}"
)
assert test_results["while_complete"], "WhileAction did not complete"
# Test 4: RepeatAction
test_service = next(
(s for s in services if s.name == "test_repeat_action"), None
)
assert test_service is not None, "test_repeat_action service not found"
client.execute_service(test_service, {"count": 5})
await asyncio.wait_for(test4_complete, timeout=2.0)
assert test_results["repeat_iterations"] == 5, (
f"RepeatAction expected 5 iterations, got {test_results['repeat_iterations']}"
)
assert test_results["repeat_complete"], "RepeatAction did not complete"
# Test 5: Combined (if + repeat + while)
test_service = next((s for s in services if s.name == "test_combined"), None)
assert test_service is not None, "test_combined service not found"
client.execute_service(test_service, {"do_loop": True, "loop_count": 2})
await asyncio.wait_for(test5_complete, timeout=2.0)
# Should execute: repeat 2 times, each iteration does while from iteration down to 0
# iteration 0: while 0 times = 0
# iteration 1: while 1 time = 1
# Total: 1 combined log
assert test_results["combined_iterations"] >= 1, (
f"Combined expected >=1 iterations, got {test_results['combined_iterations']}"
)
assert test_results["combined_complete"], "Combined did not complete"
# Test 6: Rapid triggers (tests memory efficiency of ContinuationAction)
test_service = next((s for s in services if s.name == "test_rapid_if"), None)
assert test_service is not None, "test_rapid_if service not found"
client.execute_service(test_service, {})
await asyncio.wait_for(test6_complete, timeout=2.0)
# Values 1, 2 should hit else (<=2), values 3, 4, 5 should hit then (>2)
assert test_results["rapid_else"] == 2, (
f"Rapid test expected 2 else, got {test_results['rapid_else']}"
)
assert test_results["rapid_then"] == 3, (
f"Rapid test expected 3 then, got {test_results['rapid_then']}"
)
assert test_results["rapid_complete"] == 5, (
f"Rapid test expected 5 completions, got {test_results['rapid_complete']}"
)

View File

@@ -3,6 +3,7 @@ import string
from hypothesis import example, given from hypothesis import example, given
from hypothesis.strategies import builds, integers, ip_addresses, one_of, text from hypothesis.strategies import builds, integers, ip_addresses, one_of, text
import pytest import pytest
import voluptuous as vol
from esphome import config_validation from esphome import config_validation
from esphome.components.esp32.const import ( from esphome.components.esp32.const import (
@@ -301,8 +302,6 @@ def test_split_default(framework, platform, variant, full, idf, arduino, simple)
], ],
) )
def test_require_framework_version(framework, platform, message): def test_require_framework_version(framework, platform, message):
import voluptuous as vol
from esphome.const import ( from esphome.const import (
KEY_CORE, KEY_CORE,
KEY_FRAMEWORK_VERSION, KEY_FRAMEWORK_VERSION,
@@ -377,3 +376,129 @@ def test_require_framework_version(framework, platform, message):
config_validation.require_framework_version( config_validation.require_framework_version(
extra_message="test 5", extra_message="test 5",
)("test") )("test")
def test_only_with_single_component_loaded() -> None:
"""Test OnlyWith with single component when component is loaded."""
CORE.loaded_integrations = {"mqtt"}
schema = config_validation.Schema(
{
config_validation.OnlyWith("mqtt_id", "mqtt", default="test_mqtt"): str,
}
)
result = schema({})
assert result.get("mqtt_id") == "test_mqtt"
def test_only_with_single_component_not_loaded() -> None:
"""Test OnlyWith with single component when component is not loaded."""
CORE.loaded_integrations = set()
schema = config_validation.Schema(
{
config_validation.OnlyWith("mqtt_id", "mqtt", default="test_mqtt"): str,
}
)
result = schema({})
assert "mqtt_id" not in result
def test_only_with_list_all_components_loaded() -> None:
"""Test OnlyWith with list when all components are loaded."""
CORE.loaded_integrations = {"zigbee", "nrf52"}
schema = config_validation.Schema(
{
config_validation.OnlyWith(
"zigbee_id", ["zigbee", "nrf52"], default="test_zigbee"
): str,
}
)
result = schema({})
assert result.get("zigbee_id") == "test_zigbee"
def test_only_with_list_partial_components_loaded() -> None:
"""Test OnlyWith with list when only some components are loaded."""
CORE.loaded_integrations = {"zigbee"} # Only zigbee, not nrf52
schema = config_validation.Schema(
{
config_validation.OnlyWith(
"zigbee_id", ["zigbee", "nrf52"], default="test_zigbee"
): str,
}
)
result = schema({})
assert "zigbee_id" not in result
def test_only_with_list_no_components_loaded() -> None:
"""Test OnlyWith with list when no components are loaded."""
CORE.loaded_integrations = set()
schema = config_validation.Schema(
{
config_validation.OnlyWith(
"zigbee_id", ["zigbee", "nrf52"], default="test_zigbee"
): str,
}
)
result = schema({})
assert "zigbee_id" not in result
def test_only_with_list_multiple_components() -> None:
"""Test OnlyWith with list requiring three components."""
CORE.loaded_integrations = {"comp1", "comp2", "comp3"}
schema = config_validation.Schema(
{
config_validation.OnlyWith(
"test_id", ["comp1", "comp2", "comp3"], default="test_value"
): str,
}
)
result = schema({})
assert result.get("test_id") == "test_value"
# Test with one missing
CORE.loaded_integrations = {"comp1", "comp2"}
result = schema({})
assert "test_id" not in result
def test_only_with_empty_list() -> None:
"""Test OnlyWith with empty list (edge case)."""
CORE.loaded_integrations = set()
schema = config_validation.Schema(
{
config_validation.OnlyWith("test_id", [], default="test_value"): str,
}
)
# all([]) returns True, so default should be applied
result = schema({})
assert result.get("test_id") == "test_value"
def test_only_with_user_value_overrides_default() -> None:
"""Test OnlyWith respects user-provided values over defaults."""
CORE.loaded_integrations = {"mqtt"}
schema = config_validation.Schema(
{
config_validation.OnlyWith("mqtt_id", "mqtt", default="default_id"): str,
}
)
result = schema({"mqtt_id": "custom_id"})
assert result.get("mqtt_id") == "custom_id"