1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-23 04:03:52 +01:00
This commit is contained in:
J. Nick Koston
2025-10-15 19:21:33 -10:00
parent a4b14902db
commit e3089ff0f6
3 changed files with 191 additions and 0 deletions

View File

@@ -249,6 +249,9 @@ MaxFilter = sensor_ns.class_("MaxFilter", Filter)
SlidingWindowMovingAverageFilter = sensor_ns.class_( SlidingWindowMovingAverageFilter = sensor_ns.class_(
"SlidingWindowMovingAverageFilter", Filter "SlidingWindowMovingAverageFilter", Filter
) )
StreamingMinFilter = sensor_ns.class_("StreamingMinFilter", Filter)
StreamingMaxFilter = sensor_ns.class_("StreamingMaxFilter", Filter)
StreamingMovingAverageFilter = sensor_ns.class_("StreamingMovingAverageFilter", Filter)
ExponentialMovingAverageFilter = sensor_ns.class_( ExponentialMovingAverageFilter = sensor_ns.class_(
"ExponentialMovingAverageFilter", Filter "ExponentialMovingAverageFilter", Filter
) )
@@ -452,6 +455,19 @@ async def skip_initial_filter_to_code(config, filter_id):
@FILTER_REGISTRY.register("min", MinFilter, MIN_SCHEMA) @FILTER_REGISTRY.register("min", MinFilter, MIN_SCHEMA)
async def min_filter_to_code(config, filter_id): async def min_filter_to_code(config, filter_id):
window_size = config[CONF_WINDOW_SIZE]
send_every = config[CONF_SEND_EVERY]
send_first_at = config[CONF_SEND_FIRST_AT]
# Optimization: Use streaming filter for batch windows (window_size == send_every)
# Saves 99.98% memory for large windows (e.g., 20KB → 4 bytes for window_size=5000)
if window_size == send_every:
return cg.new_Pvariable(
filter_id,
StreamingMinFilter,
window_size,
send_first_at,
)
return cg.new_Pvariable( return cg.new_Pvariable(
filter_id, filter_id,
config[CONF_WINDOW_SIZE], config[CONF_WINDOW_SIZE],
@@ -474,6 +490,19 @@ MAX_SCHEMA = cv.All(
@FILTER_REGISTRY.register("max", MaxFilter, MAX_SCHEMA) @FILTER_REGISTRY.register("max", MaxFilter, MAX_SCHEMA)
async def max_filter_to_code(config, filter_id): async def max_filter_to_code(config, filter_id):
window_size = config[CONF_WINDOW_SIZE]
send_every = config[CONF_SEND_EVERY]
send_first_at = config[CONF_SEND_FIRST_AT]
# Optimization: Use streaming filter for batch windows (window_size == send_every)
# Saves 99.98% memory for large windows (e.g., 20KB → 4 bytes for window_size=5000)
if window_size == send_every:
return cg.new_Pvariable(
filter_id,
StreamingMaxFilter,
window_size,
send_first_at,
)
return cg.new_Pvariable( return cg.new_Pvariable(
filter_id, filter_id,
config[CONF_WINDOW_SIZE], config[CONF_WINDOW_SIZE],
@@ -500,6 +529,19 @@ SLIDING_AVERAGE_SCHEMA = cv.All(
SLIDING_AVERAGE_SCHEMA, SLIDING_AVERAGE_SCHEMA,
) )
async def sliding_window_moving_average_filter_to_code(config, filter_id): async def sliding_window_moving_average_filter_to_code(config, filter_id):
window_size = config[CONF_WINDOW_SIZE]
send_every = config[CONF_SEND_EVERY]
send_first_at = config[CONF_SEND_FIRST_AT]
# Optimization: Use streaming filter for batch windows (window_size == send_every)
# Saves 99.94% memory for large windows (e.g., 20KB → 12 bytes for window_size=5000)
if window_size == send_every:
return cg.new_Pvariable(
filter_id,
StreamingMovingAverageFilter,
window_size,
send_first_at,
)
return cg.new_Pvariable( return cg.new_Pvariable(
filter_id, filter_id,
config[CONF_WINDOW_SIZE], config[CONF_WINDOW_SIZE],

View File

@@ -468,5 +468,78 @@ optional<float> ToNTCTemperatureFilter::new_value(float value) {
return temp; return temp;
} }
// StreamingFilter (base class)
StreamingFilter::StreamingFilter(size_t window_size, size_t send_first_at)
: window_size_(window_size), send_first_at_(send_first_at) {}
optional<float> StreamingFilter::new_value(float value) {
// Process the value (child class tracks min/max/sum/etc)
this->process_value(value);
this->count_++;
// Check if we should send (handle send_first_at for first value)
bool should_send = false;
if (this->first_send_ && this->count_ >= this->send_first_at_) {
should_send = true;
this->first_send_ = false;
} else if (!this->first_send_ && this->count_ >= this->window_size_) {
should_send = true;
}
if (should_send) {
float result = this->compute_batch_result();
// Reset for next batch
this->count_ = 0;
this->reset_batch();
ESP_LOGVV(TAG, "StreamingFilter(%p)::new_value(%f) SENDING %f", this, value, result);
return result;
}
return {};
}
// StreamingMinFilter
void StreamingMinFilter::process_value(float value) {
// Update running minimum (ignore NaN values)
if (!std::isnan(value)) {
this->current_min_ = std::isnan(this->current_min_) ? value : std::min(this->current_min_, value);
}
}
float StreamingMinFilter::compute_batch_result() { return this->current_min_; }
void StreamingMinFilter::reset_batch() { this->current_min_ = NAN; }
// StreamingMaxFilter
void StreamingMaxFilter::process_value(float value) {
// Update running maximum (ignore NaN values)
if (!std::isnan(value)) {
this->current_max_ = std::isnan(this->current_max_) ? value : std::max(this->current_max_, value);
}
}
float StreamingMaxFilter::compute_batch_result() { return this->current_max_; }
void StreamingMaxFilter::reset_batch() { this->current_max_ = NAN; }
// StreamingMovingAverageFilter
void StreamingMovingAverageFilter::process_value(float value) {
// Accumulate sum (ignore NaN values)
if (!std::isnan(value)) {
this->sum_ += value;
this->valid_count_++;
}
}
float StreamingMovingAverageFilter::compute_batch_result() {
return this->valid_count_ > 0 ? this->sum_ / this->valid_count_ : NAN;
}
void StreamingMovingAverageFilter::reset_batch() {
this->sum_ = 0.0f;
this->valid_count_ = 0;
}
} // namespace sensor } // namespace sensor
} // namespace esphome } // namespace esphome

View File

@@ -504,5 +504,81 @@ class ToNTCTemperatureFilter : public Filter {
double c_; double c_;
}; };
/** Base class for streaming filters (batch windows where window_size == send_every).
*
* When window_size equals send_every, we don't need a sliding window.
* This base class handles the common batching logic.
*/
class StreamingFilter : public Filter {
public:
StreamingFilter(size_t window_size, size_t send_first_at);
optional<float> new_value(float value) final;
protected:
/// Called by new_value() to process each value in the batch
virtual void process_value(float value) = 0;
/// Called by new_value() to compute the result after collecting window_size values
virtual float compute_batch_result() = 0;
/// Called by new_value() to reset internal state after sending a result
virtual void reset_batch() = 0;
size_t window_size_;
size_t count_{0};
size_t send_first_at_;
bool first_send_{true};
};
/** Streaming min filter for batch windows (window_size == send_every).
*
* Uses O(1) memory instead of O(n) by tracking only the minimum value.
*/
class StreamingMinFilter : public StreamingFilter {
public:
using StreamingFilter::StreamingFilter;
protected:
void process_value(float value) override;
float compute_batch_result() override;
void reset_batch() override;
float current_min_{NAN};
};
/** Streaming max filter for batch windows (window_size == send_every).
*
* Uses O(1) memory instead of O(n) by tracking only the maximum value.
*/
class StreamingMaxFilter : public StreamingFilter {
public:
using StreamingFilter::StreamingFilter;
protected:
void process_value(float value) override;
float compute_batch_result() override;
void reset_batch() override;
float current_max_{NAN};
};
/** Streaming moving average filter for batch windows (window_size == send_every).
*
* Uses O(1) memory instead of O(n) by tracking only sum and count.
*/
class StreamingMovingAverageFilter : public StreamingFilter {
public:
using StreamingFilter::StreamingFilter;
protected:
void process_value(float value) override;
float compute_batch_result() override;
void reset_batch() override;
float sum_{0.0f};
size_t valid_count_{0};
};
} // namespace sensor } // namespace sensor
} // namespace esphome } // namespace esphome