1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-23 20:23:50 +01:00

[sensor] Fix sliding window filter memory fragmentation with FixedVector ring buffer

This commit is contained in:
J. Nick Koston
2025-10-15 17:45:37 -10:00
parent f2e0a412db
commit d7832c44bc
2 changed files with 165 additions and 214 deletions

View File

@@ -32,50 +32,73 @@ void Filter::initialize(Sensor *parent, Filter *next) {
this->next_ = next;
}
// MedianFilter
MedianFilter::MedianFilter(size_t window_size, size_t send_every, size_t send_first_at)
: send_every_(send_every), send_at_(send_every - send_first_at), window_size_(window_size) {}
void MedianFilter::set_send_every(size_t send_every) { this->send_every_ = send_every; }
void MedianFilter::set_window_size(size_t window_size) { this->window_size_ = window_size; }
optional<float> MedianFilter::new_value(float value) {
while (this->queue_.size() >= this->window_size_) {
this->queue_.pop_front();
}
this->queue_.push_back(value);
ESP_LOGVV(TAG, "MedianFilter(%p)::new_value(%f)", this, value);
// SlidingWindowFilter
SlidingWindowFilter::SlidingWindowFilter(size_t window_size, size_t send_every, size_t send_first_at)
: window_size_(window_size), send_every_(send_every), send_at_(send_every - send_first_at) {
// Allocate ring buffer once at initialization
this->window_.init(window_size);
}
void SlidingWindowFilter::set_window_size(size_t window_size) {
this->window_size_ = window_size;
// Reallocate buffer with new size
this->window_.init(window_size);
this->window_head_ = 0;
this->window_count_ = 0;
}
optional<float> SlidingWindowFilter::new_value(float value) {
// Add value to ring buffer
if (this->window_count_ < this->window_size_) {
// Buffer not yet full - just append
this->window_.push_back(value);
this->window_count_++;
this->window_head_ = this->window_count_;
} else {
// Buffer full - overwrite oldest value (ring buffer)
this->window_[this->window_head_] = value;
this->window_head_ = (this->window_head_ + 1) % this->window_size_;
}
// Check if we should send a result
if (++this->send_at_ >= this->send_every_) {
this->send_at_ = 0;
float median = NAN;
if (!this->queue_.empty()) {
// Copy queue without NaN values
std::vector<float> median_queue;
median_queue.reserve(this->queue_.size());
for (auto v : this->queue_) {
if (!std::isnan(v)) {
median_queue.push_back(v);
}
}
sort(median_queue.begin(), median_queue.end());
size_t queue_size = median_queue.size();
if (queue_size) {
if (queue_size % 2) {
median = median_queue[queue_size / 2];
} else {
median = (median_queue[queue_size / 2] + median_queue[(queue_size / 2) - 1]) / 2.0f;
}
}
}
ESP_LOGVV(TAG, "MedianFilter(%p)::new_value(%f) SENDING %f", this, value, median);
return median;
float result = this->compute_result_();
ESP_LOGVV(TAG, "SlidingWindowFilter(%p)::new_value(%f) SENDING %f", this, value, result);
return result;
}
return {};
}
// SortedWindowFilter
FixedVector<float> SortedWindowFilter::get_sorted_values_() {
// Copy window without NaN values using FixedVector (no heap allocation)
FixedVector<float> sorted_values;
sorted_values.init(this->window_count_);
for (size_t i = 0; i < this->window_count_; i++) {
float v = this->window_[i];
if (!std::isnan(v)) {
sorted_values.push_back(v);
}
}
sort(sorted_values.begin(), sorted_values.end());
return sorted_values;
}
// MedianFilter
float MedianFilter::compute_result_() {
FixedVector<float> sorted_values = this->get_sorted_values_();
if (sorted_values.empty())
return NAN;
size_t size = sorted_values.size();
if (size % 2) {
return sorted_values[size / 2];
} else {
return (sorted_values[size / 2] + sorted_values[(size / 2) - 1]) / 2.0f;
}
}
// SkipInitialFilter
SkipInitialFilter::SkipInitialFilter(size_t num_to_ignore) : num_to_ignore_(num_to_ignore) {}
optional<float> SkipInitialFilter::new_value(float value) {
@@ -91,136 +114,36 @@ optional<float> SkipInitialFilter::new_value(float value) {
// QuantileFilter
QuantileFilter::QuantileFilter(size_t window_size, size_t send_every, size_t send_first_at, float quantile)
: send_every_(send_every), send_at_(send_every - send_first_at), window_size_(window_size), quantile_(quantile) {}
void QuantileFilter::set_send_every(size_t send_every) { this->send_every_ = send_every; }
void QuantileFilter::set_window_size(size_t window_size) { this->window_size_ = window_size; }
void QuantileFilter::set_quantile(float quantile) { this->quantile_ = quantile; }
optional<float> QuantileFilter::new_value(float value) {
while (this->queue_.size() >= this->window_size_) {
this->queue_.pop_front();
}
this->queue_.push_back(value);
ESP_LOGVV(TAG, "QuantileFilter(%p)::new_value(%f), quantile:%f", this, value, this->quantile_);
: SortedWindowFilter(window_size, send_every, send_first_at), quantile_(quantile) {}
if (++this->send_at_ >= this->send_every_) {
this->send_at_ = 0;
float QuantileFilter::compute_result_() {
FixedVector<float> sorted_values = this->get_sorted_values_();
if (sorted_values.empty())
return NAN;
float result = NAN;
if (!this->queue_.empty()) {
// Copy queue without NaN values
std::vector<float> quantile_queue;
for (auto v : this->queue_) {
if (!std::isnan(v)) {
quantile_queue.push_back(v);
}
}
sort(quantile_queue.begin(), quantile_queue.end());
size_t queue_size = quantile_queue.size();
if (queue_size) {
size_t position = ceilf(queue_size * this->quantile_) - 1;
ESP_LOGVV(TAG, "QuantileFilter(%p)::position: %zu/%zu", this, position + 1, queue_size);
result = quantile_queue[position];
}
}
ESP_LOGVV(TAG, "QuantileFilter(%p)::new_value(%f) SENDING %f", this, value, result);
return result;
}
return {};
size_t position = ceilf(sorted_values.size() * this->quantile_) - 1;
ESP_LOGVV(TAG, "QuantileFilter(%p)::position: %zu/%zu", this, position + 1, sorted_values.size());
return sorted_values[position];
}
// MinFilter
MinFilter::MinFilter(size_t window_size, size_t send_every, size_t send_first_at)
: send_every_(send_every), send_at_(send_every - send_first_at), window_size_(window_size) {}
void MinFilter::set_send_every(size_t send_every) { this->send_every_ = send_every; }
void MinFilter::set_window_size(size_t window_size) { this->window_size_ = window_size; }
optional<float> MinFilter::new_value(float value) {
while (this->queue_.size() >= this->window_size_) {
this->queue_.pop_front();
}
this->queue_.push_back(value);
ESP_LOGVV(TAG, "MinFilter(%p)::new_value(%f)", this, value);
if (++this->send_at_ >= this->send_every_) {
this->send_at_ = 0;
float min = NAN;
for (auto v : this->queue_) {
if (!std::isnan(v)) {
min = std::isnan(min) ? v : std::min(min, v);
}
}
ESP_LOGVV(TAG, "MinFilter(%p)::new_value(%f) SENDING %f", this, value, min);
return min;
}
return {};
}
float MinFilter::compute_result_() { return this->find_extremum_<std::less<float>>(); }
// MaxFilter
MaxFilter::MaxFilter(size_t window_size, size_t send_every, size_t send_first_at)
: send_every_(send_every), send_at_(send_every - send_first_at), window_size_(window_size) {}
void MaxFilter::set_send_every(size_t send_every) { this->send_every_ = send_every; }
void MaxFilter::set_window_size(size_t window_size) { this->window_size_ = window_size; }
optional<float> MaxFilter::new_value(float value) {
while (this->queue_.size() >= this->window_size_) {
this->queue_.pop_front();
}
this->queue_.push_back(value);
ESP_LOGVV(TAG, "MaxFilter(%p)::new_value(%f)", this, value);
if (++this->send_at_ >= this->send_every_) {
this->send_at_ = 0;
float max = NAN;
for (auto v : this->queue_) {
if (!std::isnan(v)) {
max = std::isnan(max) ? v : std::max(max, v);
}
}
ESP_LOGVV(TAG, "MaxFilter(%p)::new_value(%f) SENDING %f", this, value, max);
return max;
}
return {};
}
float MaxFilter::compute_result_() { return this->find_extremum_<std::greater<float>>(); }
// SlidingWindowMovingAverageFilter
SlidingWindowMovingAverageFilter::SlidingWindowMovingAverageFilter(size_t window_size, size_t send_every,
size_t send_first_at)
: send_every_(send_every), send_at_(send_every - send_first_at), window_size_(window_size) {}
void SlidingWindowMovingAverageFilter::set_send_every(size_t send_every) { this->send_every_ = send_every; }
void SlidingWindowMovingAverageFilter::set_window_size(size_t window_size) { this->window_size_ = window_size; }
optional<float> SlidingWindowMovingAverageFilter::new_value(float value) {
while (this->queue_.size() >= this->window_size_) {
this->queue_.pop_front();
}
this->queue_.push_back(value);
ESP_LOGVV(TAG, "SlidingWindowMovingAverageFilter(%p)::new_value(%f)", this, value);
if (++this->send_at_ >= this->send_every_) {
this->send_at_ = 0;
float sum = 0;
size_t valid_count = 0;
for (auto v : this->queue_) {
if (!std::isnan(v)) {
sum += v;
valid_count++;
}
float SlidingWindowMovingAverageFilter::compute_result_() {
float sum = 0;
size_t valid_count = 0;
for (size_t i = 0; i < this->window_count_; i++) {
float v = this->window_[i];
if (!std::isnan(v)) {
sum += v;
valid_count++;
}
float average = NAN;
if (valid_count) {
average = sum / valid_count;
}
ESP_LOGVV(TAG, "SlidingWindowMovingAverageFilter(%p)::new_value(%f) SENDING %f", this, value, average);
return average;
}
return {};
return valid_count ? sum / valid_count : NAN;
}
// ExponentialMovingAverageFilter

View File

@@ -44,11 +44,78 @@ class Filter {
Sensor *parent_{nullptr};
};
/** Base class for filters that use a sliding window of values.
*
* Uses a ring buffer to efficiently maintain a fixed-size sliding window without
* reallocations or pop_front() overhead. Eliminates deque fragmentation issues.
*/
class SlidingWindowFilter : public Filter {
public:
SlidingWindowFilter(size_t window_size, size_t send_every, size_t send_first_at);
void set_send_every(size_t send_every) { this->send_every_ = send_every; }
void set_window_size(size_t window_size);
optional<float> new_value(float value) final;
protected:
/// Called by new_value() to compute the filtered result from the current window
virtual float compute_result_() = 0;
/// Access the sliding window values (ring buffer implementation)
/// Use: for (size_t i = 0; i < window_count_; i++) { float val = window_[i]; }
FixedVector<float> window_;
size_t window_head_{0}; ///< Index where next value will be written
size_t window_count_{0}; ///< Number of valid values in window (0 to window_size_)
size_t window_size_; ///< Maximum window size
size_t send_every_; ///< Send result every N values
size_t send_at_; ///< Counter for send_every
};
/** Base class for Min/Max filters.
*
* Provides a templated helper to find extremum values efficiently.
*/
class MinMaxFilter : public SlidingWindowFilter {
public:
using SlidingWindowFilter::SlidingWindowFilter;
protected:
/// Helper to find min or max value in window, skipping NaN values
/// Usage: find_extremum_<std::less<float>>() for min, find_extremum_<std::greater<float>>() for max
template<typename Compare> float find_extremum_() {
float result = NAN;
Compare comp;
for (size_t i = 0; i < this->window_count_; i++) {
float v = this->window_[i];
if (!std::isnan(v)) {
result = std::isnan(result) ? v : (comp(v, result) ? v : result);
}
}
return result;
}
};
/** Base class for filters that need a sorted window (Median, Quantile).
*
* Extends SlidingWindowFilter to provide a helper that creates a sorted copy
* of non-NaN values from the window.
*/
class SortedWindowFilter : public SlidingWindowFilter {
public:
using SlidingWindowFilter::SlidingWindowFilter;
protected:
/// Helper to get sorted non-NaN values from the window
/// Returns empty FixedVector if all values are NaN
FixedVector<float> get_sorted_values_();
};
/** Simple quantile filter.
*
* Takes the quantile of the last <send_every> values and pushes it out every <send_every>.
* Takes the quantile of the last <window_size> values and pushes it out every <send_every>.
*/
class QuantileFilter : public Filter {
class QuantileFilter : public SortedWindowFilter {
public:
/** Construct a QuantileFilter.
*
@@ -61,25 +128,18 @@ class QuantileFilter : public Filter {
*/
explicit QuantileFilter(size_t window_size, size_t send_every, size_t send_first_at, float quantile);
optional<float> new_value(float value) override;
void set_send_every(size_t send_every);
void set_window_size(size_t window_size);
void set_quantile(float quantile);
void set_quantile(float quantile) { this->quantile_ = quantile; }
protected:
std::deque<float> queue_;
size_t send_every_;
size_t send_at_;
size_t window_size_;
float compute_result_() override;
float quantile_;
};
/** Simple median filter.
*
* Takes the median of the last <send_every> values and pushes it out every <send_every>.
* Takes the median of the last <window_size> values and pushes it out every <send_every>.
*/
class MedianFilter : public Filter {
class MedianFilter : public SortedWindowFilter {
public:
/** Construct a MedianFilter.
*
@@ -89,18 +149,10 @@ class MedianFilter : public Filter {
* on startup being published on the first *raw* value, so with no filter applied. Must be less than or equal to
* send_every.
*/
explicit MedianFilter(size_t window_size, size_t send_every, size_t send_first_at);
optional<float> new_value(float value) override;
void set_send_every(size_t send_every);
void set_window_size(size_t window_size);
using SortedWindowFilter::SortedWindowFilter;
protected:
std::deque<float> queue_;
size_t send_every_;
size_t send_at_;
size_t window_size_;
float compute_result_() override;
};
/** Simple skip filter.
@@ -123,9 +175,9 @@ class SkipInitialFilter : public Filter {
/** Simple min filter.
*
* Takes the min of the last <send_every> values and pushes it out every <send_every>.
* Takes the min of the last <window_size> values and pushes it out every <send_every>.
*/
class MinFilter : public Filter {
class MinFilter : public MinMaxFilter {
public:
/** Construct a MinFilter.
*
@@ -135,25 +187,17 @@ class MinFilter : public Filter {
* on startup being published on the first *raw* value, so with no filter applied. Must be less than or equal to
* send_every.
*/
explicit MinFilter(size_t window_size, size_t send_every, size_t send_first_at);
optional<float> new_value(float value) override;
void set_send_every(size_t send_every);
void set_window_size(size_t window_size);
using MinMaxFilter::MinMaxFilter;
protected:
std::deque<float> queue_;
size_t send_every_;
size_t send_at_;
size_t window_size_;
float compute_result_() override;
};
/** Simple max filter.
*
* Takes the max of the last <send_every> values and pushes it out every <send_every>.
* Takes the max of the last <window_size> values and pushes it out every <send_every>.
*/
class MaxFilter : public Filter {
class MaxFilter : public MinMaxFilter {
public:
/** Construct a MaxFilter.
*
@@ -163,18 +207,10 @@ class MaxFilter : public Filter {
* on startup being published on the first *raw* value, so with no filter applied. Must be less than or equal to
* send_every.
*/
explicit MaxFilter(size_t window_size, size_t send_every, size_t send_first_at);
optional<float> new_value(float value) override;
void set_send_every(size_t send_every);
void set_window_size(size_t window_size);
using MinMaxFilter::MinMaxFilter;
protected:
std::deque<float> queue_;
size_t send_every_;
size_t send_at_;
size_t window_size_;
float compute_result_() override;
};
/** Simple sliding window moving average filter.
@@ -182,7 +218,7 @@ class MaxFilter : public Filter {
* Essentially just takes takes the average of the last window_size values and pushes them out
* every send_every.
*/
class SlidingWindowMovingAverageFilter : public Filter {
class SlidingWindowMovingAverageFilter : public SlidingWindowFilter {
public:
/** Construct a SlidingWindowMovingAverageFilter.
*
@@ -192,18 +228,10 @@ class SlidingWindowMovingAverageFilter : public Filter {
* on startup being published on the first *raw* value, so with no filter applied. Must be less than or equal to
* send_every.
*/
explicit SlidingWindowMovingAverageFilter(size_t window_size, size_t send_every, size_t send_first_at);
optional<float> new_value(float value) override;
void set_send_every(size_t send_every);
void set_window_size(size_t window_size);
using SlidingWindowFilter::SlidingWindowFilter;
protected:
std::deque<float> queue_;
size_t send_every_;
size_t send_at_;
size_t window_size_;
float compute_result_() override;
};
/** Simple exponential moving average filter.