mirror of
https://github.com/esphome/esphome.git
synced 2025-10-27 05:03:48 +00:00
Merge branch 'unbound_queued_script_fix' into integration
This commit is contained in:
@@ -111,24 +111,26 @@ template<typename... Ts> class RestartScript : public Script<Ts...> {
|
|||||||
template<typename... Ts> class QueueingScript : public Script<Ts...>, public Component {
|
template<typename... Ts> class QueueingScript : public Script<Ts...>, public Component {
|
||||||
public:
|
public:
|
||||||
void execute(Ts... x) override {
|
void execute(Ts... x) override {
|
||||||
this->lazy_init_queue_();
|
|
||||||
|
|
||||||
if (this->is_action_running() || this->num_queued_ > 0) {
|
if (this->is_action_running() || this->num_queued_ > 0) {
|
||||||
// num_queued_ is the number of *queued* instances (waiting, not including currently running)
|
// num_queued_ is the number of *queued* instances (waiting, not including currently running)
|
||||||
// max_runs_ is the maximum *total* instances (running + queued)
|
// max_runs_ is the maximum *total* instances (running + queued)
|
||||||
// So we reject when num_queued_ + 1 >= max_runs_ (queued + running >= max)
|
// So we reject when num_queued_ + 1 >= max_runs_ (queued + running >= max)
|
||||||
if (this->num_queued_ + 1 >= this->max_runs_) {
|
if (this->num_queued_ + 1 >= this->max_runs_) {
|
||||||
this->esp_logw_(__LINE__, ESPHOME_LOG_FORMAT("Script '%s' maximum number of queued runs exceeded!"),
|
this->esp_logw_(__LINE__, ESPHOME_LOG_FORMAT("Script '%s' max instances (running + queued) reached!"),
|
||||||
LOG_STR_ARG(this->name_));
|
LOG_STR_ARG(this->name_));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Initialize queue on first queued item (after capacity check)
|
||||||
|
this->lazy_init_queue_();
|
||||||
|
|
||||||
this->esp_logd_(__LINE__, ESPHOME_LOG_FORMAT("Script '%s' queueing new instance (mode: queued)"),
|
this->esp_logd_(__LINE__, ESPHOME_LOG_FORMAT("Script '%s' queueing new instance (mode: queued)"),
|
||||||
LOG_STR_ARG(this->name_));
|
LOG_STR_ARG(this->name_));
|
||||||
// Ring buffer: write to (queue_front_ + num_queued_) % (max_runs_ - 1)
|
// Ring buffer: write to (queue_front_ + num_queued_) % queue_capacity
|
||||||
size_t write_pos = (this->queue_front_ + this->num_queued_) % (this->max_runs_ - 1);
|
const size_t queue_capacity = static_cast<size_t>(this->max_runs_ - 1);
|
||||||
// Use reset() to replace the unique_ptr
|
size_t write_pos = (this->queue_front_ + this->num_queued_) % queue_capacity;
|
||||||
this->var_queue_[write_pos].reset(new std::tuple<Ts...>(std::make_tuple(x...)));
|
// Use std::make_unique to replace the unique_ptr
|
||||||
|
this->var_queue_[write_pos] = std::make_unique<std::tuple<Ts...>>(x...);
|
||||||
this->num_queued_++;
|
this->num_queued_++;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -140,9 +142,13 @@ template<typename... Ts> class QueueingScript : public Script<Ts...>, public Com
|
|||||||
|
|
||||||
void stop() override {
|
void stop() override {
|
||||||
// Clear all queued items to free memory immediately
|
// Clear all queued items to free memory immediately
|
||||||
for (int i = 0; i < this->max_runs_ - 1; i++) {
|
if (this->var_queue_) {
|
||||||
|
const size_t queue_capacity = static_cast<size_t>(this->max_runs_ - 1);
|
||||||
|
for (size_t i = 0; i < queue_capacity; i++) {
|
||||||
this->var_queue_[i].reset();
|
this->var_queue_[i].reset();
|
||||||
}
|
}
|
||||||
|
this->var_queue_.reset();
|
||||||
|
}
|
||||||
this->num_queued_ = 0;
|
this->num_queued_ = 0;
|
||||||
this->queue_front_ = 0;
|
this->queue_front_ = 0;
|
||||||
Script<Ts...>::stop();
|
Script<Ts...>::stop();
|
||||||
@@ -152,8 +158,9 @@ template<typename... Ts> class QueueingScript : public Script<Ts...>, public Com
|
|||||||
if (this->num_queued_ != 0 && !this->is_action_running()) {
|
if (this->num_queued_ != 0 && !this->is_action_running()) {
|
||||||
// Dequeue: decrement count, move tuple out (frees slot), advance read position
|
// Dequeue: decrement count, move tuple out (frees slot), advance read position
|
||||||
this->num_queued_--;
|
this->num_queued_--;
|
||||||
|
const size_t queue_capacity = static_cast<size_t>(this->max_runs_ - 1);
|
||||||
auto tuple_ptr = std::move(this->var_queue_[this->queue_front_]);
|
auto tuple_ptr = std::move(this->var_queue_[this->queue_front_]);
|
||||||
this->queue_front_ = (this->queue_front_ + 1) % (this->max_runs_ - 1);
|
this->queue_front_ = (this->queue_front_ + 1) % queue_capacity;
|
||||||
this->trigger_tuple_(*tuple_ptr, typename gens<sizeof...(Ts)>::type());
|
this->trigger_tuple_(*tuple_ptr, typename gens<sizeof...(Ts)>::type());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -164,13 +171,10 @@ template<typename... Ts> class QueueingScript : public Script<Ts...>, public Com
|
|||||||
// Lazy init queue on first use - avoids setup() ordering issues and saves memory
|
// Lazy init queue on first use - avoids setup() ordering issues and saves memory
|
||||||
// if script is never executed during this boot cycle
|
// if script is never executed during this boot cycle
|
||||||
inline void lazy_init_queue_() {
|
inline void lazy_init_queue_() {
|
||||||
if (this->var_queue_.capacity() == 0) {
|
if (!this->var_queue_) {
|
||||||
// Allocate max_runs_ - 1 slots for queued items (running item is separate)
|
// Allocate array of max_runs_ - 1 slots for queued items (running item is separate)
|
||||||
this->var_queue_.init(this->max_runs_ - 1);
|
// unique_ptr array is zero-initialized, so all slots start as nullptr
|
||||||
// Initialize all unique_ptr slots to nullptr
|
this->var_queue_ = std::make_unique<std::unique_ptr<std::tuple<Ts...>>[]>(this->max_runs_ - 1);
|
||||||
for (int i = 0; i < this->max_runs_ - 1; i++) {
|
|
||||||
this->var_queue_.push_back(nullptr);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -181,7 +185,7 @@ template<typename... Ts> class QueueingScript : public Script<Ts...>, public Com
|
|||||||
int num_queued_ = 0; // Number of queued instances (not including currently running)
|
int num_queued_ = 0; // Number of queued instances (not including currently running)
|
||||||
int max_runs_ = 0; // Maximum total instances (running + queued)
|
int max_runs_ = 0; // Maximum total instances (running + queued)
|
||||||
size_t queue_front_ = 0; // Ring buffer read position (next item to execute)
|
size_t queue_front_ = 0; // Ring buffer read position (next item to execute)
|
||||||
FixedVector<std::unique_ptr<std::tuple<Ts...>>> var_queue_; // Ring buffer of queued parameters
|
std::unique_ptr<std::unique_ptr<std::tuple<Ts...>>[]> var_queue_; // Ring buffer of queued parameters
|
||||||
};
|
};
|
||||||
|
|
||||||
/** A script type that executes new instances in parallel.
|
/** A script type that executes new instances in parallel.
|
||||||
|
|||||||
@@ -301,7 +301,6 @@ template<typename T> class FixedVector {
|
|||||||
const T &back() const { return data_[size_ - 1]; }
|
const T &back() const { return data_[size_ - 1]; }
|
||||||
|
|
||||||
size_t size() const { return size_; }
|
size_t size() const { return size_; }
|
||||||
size_t capacity() const { return capacity_; }
|
|
||||||
bool empty() const { return size_ == 0; }
|
bool empty() const { return size_ == 0; }
|
||||||
|
|
||||||
/// Access element without bounds checking (matches std::vector behavior)
|
/// Access element without bounds checking (matches std::vector behavior)
|
||||||
|
|||||||
@@ -31,9 +31,7 @@ async def test_script_queued(
|
|||||||
# Patterns for Test 1: Queue depth
|
# Patterns for Test 1: Queue depth
|
||||||
queue_start = re.compile(r"Queue test: START item (\d+)")
|
queue_start = re.compile(r"Queue test: START item (\d+)")
|
||||||
queue_end = re.compile(r"Queue test: END item (\d+)")
|
queue_end = re.compile(r"Queue test: END item (\d+)")
|
||||||
queue_reject = re.compile(
|
queue_reject = re.compile(r"Script 'queue_depth_script' max instances")
|
||||||
r"Script 'queue_depth_script' maximum number of queued runs exceeded!"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Patterns for Test 2: Ring buffer
|
# Patterns for Test 2: Ring buffer
|
||||||
ring_start = re.compile(r"Ring buffer: START '([A-Z])'")
|
ring_start = re.compile(r"Ring buffer: START '([A-Z])'")
|
||||||
@@ -46,9 +44,7 @@ async def test_script_queued(
|
|||||||
# Patterns for Test 4: Rejection
|
# Patterns for Test 4: Rejection
|
||||||
reject_start = re.compile(r"Rejection test: START (\d+)")
|
reject_start = re.compile(r"Rejection test: START (\d+)")
|
||||||
reject_end = re.compile(r"Rejection test: END (\d+)")
|
reject_end = re.compile(r"Rejection test: END (\d+)")
|
||||||
reject_reject = re.compile(
|
reject_reject = re.compile(r"Script 'rejection_script' max instances")
|
||||||
r"Script 'rejection_script' maximum number of queued runs exceeded!"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Patterns for Test 5: No params
|
# Patterns for Test 5: No params
|
||||||
no_params_end = re.compile(r"No params: END")
|
no_params_end = re.compile(r"No params: END")
|
||||||
|
|||||||
Reference in New Issue
Block a user