diff --git a/esphome/components/sensor/__init__.py b/esphome/components/sensor/__init__.py index 2b99f68ac0..1585a6342f 100644 --- a/esphome/components/sensor/__init__.py +++ b/esphome/components/sensor/__init__.py @@ -249,6 +249,9 @@ MaxFilter = sensor_ns.class_("MaxFilter", Filter) SlidingWindowMovingAverageFilter = sensor_ns.class_( "SlidingWindowMovingAverageFilter", Filter ) +StreamingMinFilter = sensor_ns.class_("StreamingMinFilter", Filter) +StreamingMaxFilter = sensor_ns.class_("StreamingMaxFilter", Filter) +StreamingMovingAverageFilter = sensor_ns.class_("StreamingMovingAverageFilter", Filter) ExponentialMovingAverageFilter = sensor_ns.class_( "ExponentialMovingAverageFilter", Filter ) @@ -452,6 +455,19 @@ async def skip_initial_filter_to_code(config, filter_id): @FILTER_REGISTRY.register("min", MinFilter, MIN_SCHEMA) async def min_filter_to_code(config, filter_id): + window_size = config[CONF_WINDOW_SIZE] + send_every = config[CONF_SEND_EVERY] + send_first_at = config[CONF_SEND_FIRST_AT] + + # Optimization: Use streaming filter for batch windows (window_size == send_every) + # Saves 99.98% memory for large windows (e.g., 20KB → 4 bytes for window_size=5000) + if window_size == send_every: + return cg.new_Pvariable( + filter_id, + StreamingMinFilter, + window_size, + send_first_at, + ) return cg.new_Pvariable( filter_id, config[CONF_WINDOW_SIZE], @@ -474,6 +490,19 @@ MAX_SCHEMA = cv.All( @FILTER_REGISTRY.register("max", MaxFilter, MAX_SCHEMA) async def max_filter_to_code(config, filter_id): + window_size = config[CONF_WINDOW_SIZE] + send_every = config[CONF_SEND_EVERY] + send_first_at = config[CONF_SEND_FIRST_AT] + + # Optimization: Use streaming filter for batch windows (window_size == send_every) + # Saves 99.98% memory for large windows (e.g., 20KB → 4 bytes for window_size=5000) + if window_size == send_every: + return cg.new_Pvariable( + filter_id, + StreamingMaxFilter, + window_size, + send_first_at, + ) return cg.new_Pvariable( filter_id, config[CONF_WINDOW_SIZE], @@ -500,6 +529,19 @@ SLIDING_AVERAGE_SCHEMA = cv.All( SLIDING_AVERAGE_SCHEMA, ) async def sliding_window_moving_average_filter_to_code(config, filter_id): + window_size = config[CONF_WINDOW_SIZE] + send_every = config[CONF_SEND_EVERY] + send_first_at = config[CONF_SEND_FIRST_AT] + + # Optimization: Use streaming filter for batch windows (window_size == send_every) + # Saves 99.94% memory for large windows (e.g., 20KB → 12 bytes for window_size=5000) + if window_size == send_every: + return cg.new_Pvariable( + filter_id, + StreamingMovingAverageFilter, + window_size, + send_first_at, + ) return cg.new_Pvariable( filter_id, config[CONF_WINDOW_SIZE], diff --git a/esphome/components/sensor/filter.cpp b/esphome/components/sensor/filter.cpp index 4863c00a29..c804125dcc 100644 --- a/esphome/components/sensor/filter.cpp +++ b/esphome/components/sensor/filter.cpp @@ -468,5 +468,78 @@ optional ToNTCTemperatureFilter::new_value(float value) { return temp; } +// StreamingFilter (base class) +StreamingFilter::StreamingFilter(size_t window_size, size_t send_first_at) + : window_size_(window_size), send_first_at_(send_first_at) {} + +optional StreamingFilter::new_value(float value) { + // Process the value (child class tracks min/max/sum/etc) + this->process_value(value); + + this->count_++; + + // Check if we should send (handle send_first_at for first value) + bool should_send = false; + if (this->first_send_ && this->count_ >= this->send_first_at_) { + should_send = true; + this->first_send_ = false; + } else if (!this->first_send_ && this->count_ >= this->window_size_) { + should_send = true; + } + + if (should_send) { + float result = this->compute_batch_result(); + // Reset for next batch + this->count_ = 0; + this->reset_batch(); + ESP_LOGVV(TAG, "StreamingFilter(%p)::new_value(%f) SENDING %f", this, value, result); + return result; + } + + return {}; +} + +// StreamingMinFilter +void StreamingMinFilter::process_value(float value) { + // Update running minimum (ignore NaN values) + if (!std::isnan(value)) { + this->current_min_ = std::isnan(this->current_min_) ? value : std::min(this->current_min_, value); + } +} + +float StreamingMinFilter::compute_batch_result() { return this->current_min_; } + +void StreamingMinFilter::reset_batch() { this->current_min_ = NAN; } + +// StreamingMaxFilter +void StreamingMaxFilter::process_value(float value) { + // Update running maximum (ignore NaN values) + if (!std::isnan(value)) { + this->current_max_ = std::isnan(this->current_max_) ? value : std::max(this->current_max_, value); + } +} + +float StreamingMaxFilter::compute_batch_result() { return this->current_max_; } + +void StreamingMaxFilter::reset_batch() { this->current_max_ = NAN; } + +// StreamingMovingAverageFilter +void StreamingMovingAverageFilter::process_value(float value) { + // Accumulate sum (ignore NaN values) + if (!std::isnan(value)) { + this->sum_ += value; + this->valid_count_++; + } +} + +float StreamingMovingAverageFilter::compute_batch_result() { + return this->valid_count_ > 0 ? this->sum_ / this->valid_count_ : NAN; +} + +void StreamingMovingAverageFilter::reset_batch() { + this->sum_ = 0.0f; + this->valid_count_ = 0; +} + } // namespace sensor } // namespace esphome diff --git a/esphome/components/sensor/filter.h b/esphome/components/sensor/filter.h index b391048521..c9b39b73c3 100644 --- a/esphome/components/sensor/filter.h +++ b/esphome/components/sensor/filter.h @@ -504,5 +504,81 @@ class ToNTCTemperatureFilter : public Filter { double c_; }; +/** Base class for streaming filters (batch windows where window_size == send_every). + * + * When window_size equals send_every, we don't need a sliding window. + * This base class handles the common batching logic. + */ +class StreamingFilter : public Filter { + public: + StreamingFilter(size_t window_size, size_t send_first_at); + + optional new_value(float value) final; + + protected: + /// Called by new_value() to process each value in the batch + virtual void process_value(float value) = 0; + + /// Called by new_value() to compute the result after collecting window_size values + virtual float compute_batch_result() = 0; + + /// Called by new_value() to reset internal state after sending a result + virtual void reset_batch() = 0; + + size_t window_size_; + size_t count_{0}; + size_t send_first_at_; + bool first_send_{true}; +}; + +/** Streaming min filter for batch windows (window_size == send_every). + * + * Uses O(1) memory instead of O(n) by tracking only the minimum value. + */ +class StreamingMinFilter : public StreamingFilter { + public: + using StreamingFilter::StreamingFilter; + + protected: + void process_value(float value) override; + float compute_batch_result() override; + void reset_batch() override; + + float current_min_{NAN}; +}; + +/** Streaming max filter for batch windows (window_size == send_every). + * + * Uses O(1) memory instead of O(n) by tracking only the maximum value. + */ +class StreamingMaxFilter : public StreamingFilter { + public: + using StreamingFilter::StreamingFilter; + + protected: + void process_value(float value) override; + float compute_batch_result() override; + void reset_batch() override; + + float current_max_{NAN}; +}; + +/** Streaming moving average filter for batch windows (window_size == send_every). + * + * Uses O(1) memory instead of O(n) by tracking only sum and count. + */ +class StreamingMovingAverageFilter : public StreamingFilter { + public: + using StreamingFilter::StreamingFilter; + + protected: + void process_value(float value) override; + float compute_batch_result() override; + void reset_batch() override; + + float sum_{0.0f}; + size_t valid_count_{0}; +}; + } // namespace sensor } // namespace esphome