mirror of
https://github.com/esphome/esphome.git
synced 2025-11-17 07:15:48 +00:00
Compare commits
9 Commits
wifi_defau
...
bh1750_loo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
78a69cb744 | ||
|
|
9b14444dad | ||
|
|
8934d4b498 | ||
|
|
9b107e7f2a | ||
|
|
10bdb47eae | ||
|
|
aa097a2fe6 | ||
|
|
3b860e784c | ||
|
|
96ee38759d | ||
|
|
986d3c8f13 |
@@ -1,8 +1,8 @@
|
||||
#include "bh1750.h"
|
||||
#include "esphome/core/log.h"
|
||||
#include "esphome/core/application.h"
|
||||
|
||||
namespace esphome {
|
||||
namespace bh1750 {
|
||||
namespace esphome::bh1750 {
|
||||
|
||||
static const char *const TAG = "bh1750.sensor";
|
||||
|
||||
@@ -13,6 +13,31 @@ static const uint8_t BH1750_COMMAND_ONE_TIME_L = 0b00100011;
|
||||
static const uint8_t BH1750_COMMAND_ONE_TIME_H = 0b00100000;
|
||||
static const uint8_t BH1750_COMMAND_ONE_TIME_H2 = 0b00100001;
|
||||
|
||||
static constexpr uint32_t MEASUREMENT_TIMEOUT_MS = 2000;
|
||||
static constexpr float HIGH_LIGHT_THRESHOLD_LX = 7000.0f;
|
||||
|
||||
// Measurement time constants (datasheet values)
|
||||
static constexpr uint16_t MTREG_DEFAULT = 69;
|
||||
static constexpr uint16_t MTREG_MIN = 31;
|
||||
static constexpr uint16_t MTREG_MAX = 254;
|
||||
static constexpr uint16_t MEAS_TIME_L_MS = 24; // L-resolution max measurement time @ mtreg=69
|
||||
static constexpr uint16_t MEAS_TIME_H_MS = 180; // H/H2-resolution max measurement time @ mtreg=69
|
||||
|
||||
// Conversion constants (datasheet formulas)
|
||||
static constexpr float RESOLUTION_DIVISOR = 1.2f; // counts to lux conversion divisor
|
||||
static constexpr float MODE_H2_DIVISOR = 2.0f; // H2 mode has 2x higher resolution
|
||||
|
||||
// MTreg calculation constants
|
||||
static constexpr int COUNTS_TARGET = 50000; // Target counts for optimal range (avoid saturation)
|
||||
static constexpr int COUNTS_NUMERATOR = 10;
|
||||
static constexpr int COUNTS_DENOMINATOR = 12;
|
||||
|
||||
// MTreg register bit manipulation constants
|
||||
static constexpr uint8_t MTREG_HI_SHIFT = 5; // High 3 bits start at bit 5
|
||||
static constexpr uint8_t MTREG_HI_MASK = 0b111; // 3-bit mask for high bits
|
||||
static constexpr uint8_t MTREG_LO_SHIFT = 0; // Low 5 bits start at bit 0
|
||||
static constexpr uint8_t MTREG_LO_MASK = 0b11111; // 5-bit mask for low bits
|
||||
|
||||
/*
|
||||
bh1750 properties:
|
||||
|
||||
@@ -43,74 +68,7 @@ void BH1750Sensor::setup() {
|
||||
this->mark_failed();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void BH1750Sensor::read_lx_(BH1750Mode mode, uint8_t mtreg, const std::function<void(float)> &f) {
|
||||
// turn on (after one-shot sensor automatically powers down)
|
||||
uint8_t turn_on = BH1750_COMMAND_POWER_ON;
|
||||
if (this->write(&turn_on, 1) != i2c::ERROR_OK) {
|
||||
ESP_LOGW(TAG, "Power on failed");
|
||||
f(NAN);
|
||||
return;
|
||||
}
|
||||
|
||||
if (active_mtreg_ != mtreg) {
|
||||
// set mtreg
|
||||
uint8_t mtreg_hi = BH1750_COMMAND_MT_REG_HI | ((mtreg >> 5) & 0b111);
|
||||
uint8_t mtreg_lo = BH1750_COMMAND_MT_REG_LO | ((mtreg >> 0) & 0b11111);
|
||||
if (this->write(&mtreg_hi, 1) != i2c::ERROR_OK || this->write(&mtreg_lo, 1) != i2c::ERROR_OK) {
|
||||
ESP_LOGW(TAG, "Set measurement time failed");
|
||||
active_mtreg_ = 0;
|
||||
f(NAN);
|
||||
return;
|
||||
}
|
||||
active_mtreg_ = mtreg;
|
||||
}
|
||||
|
||||
uint8_t cmd;
|
||||
uint16_t meas_time;
|
||||
switch (mode) {
|
||||
case BH1750_MODE_L:
|
||||
cmd = BH1750_COMMAND_ONE_TIME_L;
|
||||
meas_time = 24 * mtreg / 69;
|
||||
break;
|
||||
case BH1750_MODE_H:
|
||||
cmd = BH1750_COMMAND_ONE_TIME_H;
|
||||
meas_time = 180 * mtreg / 69;
|
||||
break;
|
||||
case BH1750_MODE_H2:
|
||||
cmd = BH1750_COMMAND_ONE_TIME_H2;
|
||||
meas_time = 180 * mtreg / 69;
|
||||
break;
|
||||
default:
|
||||
f(NAN);
|
||||
return;
|
||||
}
|
||||
if (this->write(&cmd, 1) != i2c::ERROR_OK) {
|
||||
ESP_LOGW(TAG, "Start measurement failed");
|
||||
f(NAN);
|
||||
return;
|
||||
}
|
||||
|
||||
// probably not needed, but adjust for rounding
|
||||
meas_time++;
|
||||
|
||||
this->set_timeout("read", meas_time, [this, mode, mtreg, f]() {
|
||||
uint16_t raw_value;
|
||||
if (this->read(reinterpret_cast<uint8_t *>(&raw_value), 2) != i2c::ERROR_OK) {
|
||||
ESP_LOGW(TAG, "Read data failed");
|
||||
f(NAN);
|
||||
return;
|
||||
}
|
||||
raw_value = i2c::i2ctohs(raw_value);
|
||||
|
||||
float lx = float(raw_value) / 1.2f;
|
||||
lx *= 69.0f / mtreg;
|
||||
if (mode == BH1750_MODE_H2)
|
||||
lx /= 2.0f;
|
||||
|
||||
f(lx);
|
||||
});
|
||||
this->state_ = IDLE;
|
||||
}
|
||||
|
||||
void BH1750Sensor::dump_config() {
|
||||
@@ -124,45 +82,188 @@ void BH1750Sensor::dump_config() {
|
||||
}
|
||||
|
||||
void BH1750Sensor::update() {
|
||||
// first do a quick measurement in L-mode with full range
|
||||
// to find right range
|
||||
this->read_lx_(BH1750_MODE_L, 31, [this](float val) {
|
||||
if (std::isnan(val)) {
|
||||
this->status_set_warning();
|
||||
this->publish_state(NAN);
|
||||
const uint32_t now = millis();
|
||||
|
||||
// Start coarse measurement to determine optimal mode/mtreg
|
||||
if (this->state_ != IDLE) {
|
||||
// Safety timeout: reset if stuck
|
||||
if (now - this->measurement_start_time_ > MEASUREMENT_TIMEOUT_MS) {
|
||||
ESP_LOGW(TAG, "Measurement timeout, resetting state");
|
||||
this->state_ = IDLE;
|
||||
} else {
|
||||
ESP_LOGW(TAG, "Previous measurement not complete, skipping update");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
BH1750Mode use_mode;
|
||||
uint8_t use_mtreg;
|
||||
if (val <= 7000) {
|
||||
use_mode = BH1750_MODE_H2;
|
||||
use_mtreg = 254;
|
||||
} else {
|
||||
use_mode = BH1750_MODE_H;
|
||||
// lx = counts / 1.2 * (69 / mtreg)
|
||||
// -> mtreg = counts / 1.2 * (69 / lx)
|
||||
// calculate for counts=50000 (allow some range to not saturate, but maximize mtreg)
|
||||
// -> mtreg = 50000*(10/12)*(69/lx)
|
||||
int ideal_mtreg = 50000 * 10 * 69 / (12 * (int) val);
|
||||
use_mtreg = std::min(254, std::max(31, ideal_mtreg));
|
||||
}
|
||||
ESP_LOGV(TAG, "L result: %f -> Calculated mode=%d, mtreg=%d", val, (int) use_mode, use_mtreg);
|
||||
if (!this->start_measurement_(BH1750_MODE_L, MTREG_MIN, now)) {
|
||||
this->status_set_warning();
|
||||
this->publish_state(NAN);
|
||||
return;
|
||||
}
|
||||
|
||||
this->read_lx_(use_mode, use_mtreg, [this](float val) {
|
||||
if (std::isnan(val)) {
|
||||
this->status_set_warning();
|
||||
this->publish_state(NAN);
|
||||
return;
|
||||
this->state_ = WAITING_COARSE_MEASUREMENT;
|
||||
this->enable_loop(); // Enable loop while measurement in progress
|
||||
}
|
||||
|
||||
void BH1750Sensor::loop() {
|
||||
const uint32_t now = App.get_loop_component_start_time();
|
||||
|
||||
switch (this->state_) {
|
||||
case IDLE:
|
||||
// Disable loop when idle to save cycles
|
||||
this->disable_loop();
|
||||
break;
|
||||
|
||||
case WAITING_COARSE_MEASUREMENT:
|
||||
if (now - this->measurement_start_time_ >= this->measurement_duration_) {
|
||||
this->state_ = READING_COARSE_RESULT;
|
||||
}
|
||||
ESP_LOGD(TAG, "'%s': Illuminance=%.1flx", this->get_name().c_str(), val);
|
||||
break;
|
||||
|
||||
case READING_COARSE_RESULT: {
|
||||
float lx;
|
||||
if (!this->read_measurement_(lx)) {
|
||||
this->fail_and_reset_();
|
||||
break;
|
||||
}
|
||||
|
||||
this->process_coarse_result_(lx);
|
||||
|
||||
// Start fine measurement with optimal settings
|
||||
if (!this->start_measurement_(this->fine_mode_, this->fine_mtreg_, now)) {
|
||||
this->fail_and_reset_();
|
||||
break;
|
||||
}
|
||||
|
||||
this->state_ = WAITING_FINE_MEASUREMENT;
|
||||
break;
|
||||
}
|
||||
|
||||
case WAITING_FINE_MEASUREMENT:
|
||||
if (now - this->measurement_start_time_ >= this->measurement_duration_) {
|
||||
this->state_ = READING_FINE_RESULT;
|
||||
}
|
||||
break;
|
||||
|
||||
case READING_FINE_RESULT: {
|
||||
float lx;
|
||||
if (!this->read_measurement_(lx)) {
|
||||
this->fail_and_reset_();
|
||||
break;
|
||||
}
|
||||
|
||||
ESP_LOGD(TAG, "'%s': Illuminance=%.1flx", this->get_name().c_str(), lx);
|
||||
this->status_clear_warning();
|
||||
this->publish_state(val);
|
||||
});
|
||||
});
|
||||
this->publish_state(lx);
|
||||
this->state_ = IDLE;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool BH1750Sensor::start_measurement_(BH1750Mode mode, uint8_t mtreg, uint32_t now) {
|
||||
// Power on
|
||||
uint8_t turn_on = BH1750_COMMAND_POWER_ON;
|
||||
if (this->write(&turn_on, 1) != i2c::ERROR_OK) {
|
||||
ESP_LOGW(TAG, "Power on failed");
|
||||
return false;
|
||||
}
|
||||
|
||||
// Set MTreg if changed
|
||||
if (this->active_mtreg_ != mtreg) {
|
||||
uint8_t mtreg_hi = BH1750_COMMAND_MT_REG_HI | ((mtreg >> MTREG_HI_SHIFT) & MTREG_HI_MASK);
|
||||
uint8_t mtreg_lo = BH1750_COMMAND_MT_REG_LO | ((mtreg >> MTREG_LO_SHIFT) & MTREG_LO_MASK);
|
||||
if (this->write(&mtreg_hi, 1) != i2c::ERROR_OK || this->write(&mtreg_lo, 1) != i2c::ERROR_OK) {
|
||||
ESP_LOGW(TAG, "Set measurement time failed");
|
||||
this->active_mtreg_ = 0;
|
||||
return false;
|
||||
}
|
||||
this->active_mtreg_ = mtreg;
|
||||
}
|
||||
|
||||
// Start measurement
|
||||
uint8_t cmd;
|
||||
uint16_t meas_time;
|
||||
switch (mode) {
|
||||
case BH1750_MODE_L:
|
||||
cmd = BH1750_COMMAND_ONE_TIME_L;
|
||||
meas_time = MEAS_TIME_L_MS * mtreg / MTREG_DEFAULT;
|
||||
break;
|
||||
case BH1750_MODE_H:
|
||||
cmd = BH1750_COMMAND_ONE_TIME_H;
|
||||
meas_time = MEAS_TIME_H_MS * mtreg / MTREG_DEFAULT;
|
||||
break;
|
||||
case BH1750_MODE_H2:
|
||||
cmd = BH1750_COMMAND_ONE_TIME_H2;
|
||||
meas_time = MEAS_TIME_H_MS * mtreg / MTREG_DEFAULT;
|
||||
break;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
|
||||
if (this->write(&cmd, 1) != i2c::ERROR_OK) {
|
||||
ESP_LOGW(TAG, "Start measurement failed");
|
||||
return false;
|
||||
}
|
||||
|
||||
// Store current measurement parameters
|
||||
this->current_mode_ = mode;
|
||||
this->current_mtreg_ = mtreg;
|
||||
this->measurement_start_time_ = now;
|
||||
this->measurement_duration_ = meas_time + 1; // Add 1ms for safety
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool BH1750Sensor::read_measurement_(float &lx_out) {
|
||||
uint16_t raw_value;
|
||||
if (this->read(reinterpret_cast<uint8_t *>(&raw_value), 2) != i2c::ERROR_OK) {
|
||||
ESP_LOGW(TAG, "Read data failed");
|
||||
return false;
|
||||
}
|
||||
raw_value = i2c::i2ctohs(raw_value);
|
||||
|
||||
float lx = float(raw_value) / RESOLUTION_DIVISOR;
|
||||
lx *= float(MTREG_DEFAULT) / this->current_mtreg_;
|
||||
if (this->current_mode_ == BH1750_MODE_H2) {
|
||||
lx /= MODE_H2_DIVISOR;
|
||||
}
|
||||
|
||||
lx_out = lx;
|
||||
return true;
|
||||
}
|
||||
|
||||
void BH1750Sensor::process_coarse_result_(float lx) {
|
||||
if (std::isnan(lx)) {
|
||||
// Use defaults if coarse measurement failed
|
||||
this->fine_mode_ = BH1750_MODE_H2;
|
||||
this->fine_mtreg_ = MTREG_MAX;
|
||||
return;
|
||||
}
|
||||
|
||||
if (lx <= HIGH_LIGHT_THRESHOLD_LX) {
|
||||
this->fine_mode_ = BH1750_MODE_H2;
|
||||
this->fine_mtreg_ = MTREG_MAX;
|
||||
} else {
|
||||
this->fine_mode_ = BH1750_MODE_H;
|
||||
// lx = counts / 1.2 * (69 / mtreg)
|
||||
// -> mtreg = counts / 1.2 * (69 / lx)
|
||||
// calculate for counts=50000 (allow some range to not saturate, but maximize mtreg)
|
||||
// -> mtreg = 50000*(10/12)*(69/lx)
|
||||
int ideal_mtreg = COUNTS_TARGET * COUNTS_NUMERATOR * MTREG_DEFAULT / (COUNTS_DENOMINATOR * (int) lx);
|
||||
this->fine_mtreg_ = std::min((int) MTREG_MAX, std::max((int) MTREG_MIN, ideal_mtreg));
|
||||
}
|
||||
|
||||
ESP_LOGV(TAG, "L result: %.1f -> Calculated mode=%d, mtreg=%d", lx, (int) this->fine_mode_, this->fine_mtreg_);
|
||||
}
|
||||
|
||||
void BH1750Sensor::fail_and_reset_() {
|
||||
this->status_set_warning();
|
||||
this->publish_state(NAN);
|
||||
this->state_ = IDLE;
|
||||
}
|
||||
|
||||
float BH1750Sensor::get_setup_priority() const { return setup_priority::DATA; }
|
||||
|
||||
} // namespace bh1750
|
||||
} // namespace esphome
|
||||
} // namespace esphome::bh1750
|
||||
|
||||
@@ -4,10 +4,9 @@
|
||||
#include "esphome/components/sensor/sensor.h"
|
||||
#include "esphome/components/i2c/i2c.h"
|
||||
|
||||
namespace esphome {
|
||||
namespace bh1750 {
|
||||
namespace esphome::bh1750 {
|
||||
|
||||
enum BH1750Mode {
|
||||
enum BH1750Mode : uint8_t {
|
||||
BH1750_MODE_L,
|
||||
BH1750_MODE_H,
|
||||
BH1750_MODE_H2,
|
||||
@@ -21,13 +20,36 @@ class BH1750Sensor : public sensor::Sensor, public PollingComponent, public i2c:
|
||||
void setup() override;
|
||||
void dump_config() override;
|
||||
void update() override;
|
||||
void loop() override;
|
||||
float get_setup_priority() const override;
|
||||
|
||||
protected:
|
||||
void read_lx_(BH1750Mode mode, uint8_t mtreg, const std::function<void(float)> &f);
|
||||
// State machine states
|
||||
enum State : uint8_t {
|
||||
IDLE,
|
||||
WAITING_COARSE_MEASUREMENT,
|
||||
READING_COARSE_RESULT,
|
||||
WAITING_FINE_MEASUREMENT,
|
||||
READING_FINE_RESULT,
|
||||
};
|
||||
|
||||
// 4-byte aligned members
|
||||
uint32_t measurement_start_time_{0};
|
||||
uint32_t measurement_duration_{0};
|
||||
|
||||
// 1-byte members grouped together to minimize padding
|
||||
State state_{IDLE};
|
||||
BH1750Mode current_mode_{BH1750_MODE_L};
|
||||
uint8_t current_mtreg_{31};
|
||||
BH1750Mode fine_mode_{BH1750_MODE_H2};
|
||||
uint8_t fine_mtreg_{254};
|
||||
uint8_t active_mtreg_{0};
|
||||
|
||||
// Helper methods
|
||||
bool start_measurement_(BH1750Mode mode, uint8_t mtreg, uint32_t now);
|
||||
bool read_measurement_(float &lx_out);
|
||||
void process_coarse_result_(float lx);
|
||||
void fail_and_reset_();
|
||||
};
|
||||
|
||||
} // namespace bh1750
|
||||
} // namespace esphome
|
||||
} // namespace esphome::bh1750
|
||||
|
||||
@@ -4,8 +4,7 @@
|
||||
#include "esphome/core/automation.h"
|
||||
#include "cover.h"
|
||||
|
||||
namespace esphome {
|
||||
namespace cover {
|
||||
namespace esphome::cover {
|
||||
|
||||
template<typename... Ts> class OpenAction : public Action<Ts...> {
|
||||
public:
|
||||
@@ -131,5 +130,4 @@ class CoverClosedTrigger : public Trigger<> {
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace cover
|
||||
} // namespace esphome
|
||||
} // namespace esphome::cover
|
||||
|
||||
@@ -6,8 +6,7 @@
|
||||
|
||||
#include "esphome/core/log.h"
|
||||
|
||||
namespace esphome {
|
||||
namespace cover {
|
||||
namespace esphome::cover {
|
||||
|
||||
static const char *const TAG = "cover";
|
||||
|
||||
@@ -212,5 +211,4 @@ void CoverRestoreState::apply(Cover *cover) {
|
||||
cover->publish_state();
|
||||
}
|
||||
|
||||
} // namespace cover
|
||||
} // namespace esphome
|
||||
} // namespace esphome::cover
|
||||
|
||||
@@ -7,8 +7,7 @@
|
||||
|
||||
#include "cover_traits.h"
|
||||
|
||||
namespace esphome {
|
||||
namespace cover {
|
||||
namespace esphome::cover {
|
||||
|
||||
const extern float COVER_OPEN;
|
||||
const extern float COVER_CLOSED;
|
||||
@@ -157,5 +156,4 @@ class Cover : public EntityBase, public EntityBase_DeviceClass {
|
||||
ESPPreferenceObject rtc_;
|
||||
};
|
||||
|
||||
} // namespace cover
|
||||
} // namespace esphome
|
||||
} // namespace esphome::cover
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
namespace esphome {
|
||||
namespace cover {
|
||||
namespace esphome::cover {
|
||||
|
||||
class CoverTraits {
|
||||
public:
|
||||
@@ -26,5 +25,4 @@ class CoverTraits {
|
||||
bool supports_stop_{false};
|
||||
};
|
||||
|
||||
} // namespace cover
|
||||
} // namespace esphome
|
||||
} // namespace esphome::cover
|
||||
|
||||
@@ -1,9 +1,14 @@
|
||||
import logging
|
||||
|
||||
import esphome.codegen as cg
|
||||
from esphome.components import time as time_
|
||||
from esphome.config_helpers import merge_config
|
||||
import esphome.config_validation as cv
|
||||
from esphome.const import (
|
||||
CONF_ID,
|
||||
CONF_PLATFORM,
|
||||
CONF_SERVERS,
|
||||
CONF_TIME,
|
||||
PLATFORM_BK72XX,
|
||||
PLATFORM_ESP32,
|
||||
PLATFORM_ESP8266,
|
||||
@@ -12,13 +17,74 @@ from esphome.const import (
|
||||
PLATFORM_RTL87XX,
|
||||
)
|
||||
from esphome.core import CORE
|
||||
import esphome.final_validate as fv
|
||||
from esphome.types import ConfigType
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
DEPENDENCIES = ["network"]
|
||||
|
||||
CONF_SNTP = "sntp"
|
||||
|
||||
sntp_ns = cg.esphome_ns.namespace("sntp")
|
||||
SNTPComponent = sntp_ns.class_("SNTPComponent", time_.RealTimeClock)
|
||||
|
||||
DEFAULT_SERVERS = ["0.pool.ntp.org", "1.pool.ntp.org", "2.pool.ntp.org"]
|
||||
|
||||
|
||||
def _sntp_final_validate(config: ConfigType) -> None:
|
||||
"""Merge multiple SNTP instances into one, similar to OTA merging behavior."""
|
||||
full_conf = fv.full_config.get()
|
||||
time_confs = full_conf.get(CONF_TIME, [])
|
||||
|
||||
sntp_configs: list[ConfigType] = []
|
||||
other_time_configs: list[ConfigType] = []
|
||||
|
||||
for time_conf in time_confs:
|
||||
if time_conf.get(CONF_PLATFORM) == CONF_SNTP:
|
||||
sntp_configs.append(time_conf)
|
||||
else:
|
||||
other_time_configs.append(time_conf)
|
||||
|
||||
if len(sntp_configs) <= 1:
|
||||
return
|
||||
|
||||
# Merge all SNTP configs into the first one
|
||||
merged = sntp_configs[0]
|
||||
for sntp_conf in sntp_configs[1:]:
|
||||
# Validate that IDs are consistent if manually specified
|
||||
if merged[CONF_ID].is_manual and sntp_conf[CONF_ID].is_manual:
|
||||
raise cv.Invalid(
|
||||
f"Found multiple SNTP configurations but {CONF_ID} is inconsistent"
|
||||
)
|
||||
merged = merge_config(merged, sntp_conf)
|
||||
|
||||
# Deduplicate servers while preserving order
|
||||
servers = merged[CONF_SERVERS]
|
||||
unique_servers = list(dict.fromkeys(servers))
|
||||
|
||||
# Warn if we're dropping servers due to 3-server limit
|
||||
if len(unique_servers) > 3:
|
||||
dropped = unique_servers[3:]
|
||||
unique_servers = unique_servers[:3]
|
||||
_LOGGER.warning(
|
||||
"SNTP supports maximum 3 servers. Dropped excess server(s): %s",
|
||||
dropped,
|
||||
)
|
||||
|
||||
merged[CONF_SERVERS] = unique_servers
|
||||
|
||||
_LOGGER.warning(
|
||||
"Found and merged %d SNTP time configurations into one instance",
|
||||
len(sntp_configs),
|
||||
)
|
||||
|
||||
# Replace time configs with merged SNTP + other time platforms
|
||||
other_time_configs.append(merged)
|
||||
full_conf[CONF_TIME] = other_time_configs
|
||||
fv.full_config.set(full_conf)
|
||||
|
||||
|
||||
CONFIG_SCHEMA = cv.All(
|
||||
time_.TIME_SCHEMA.extend(
|
||||
{
|
||||
@@ -40,6 +106,8 @@ CONFIG_SCHEMA = cv.All(
|
||||
),
|
||||
)
|
||||
|
||||
FINAL_VALIDATE_SCHEMA = _sntp_final_validate
|
||||
|
||||
|
||||
async def to_code(config):
|
||||
servers = config[CONF_SERVERS]
|
||||
|
||||
@@ -56,11 +56,19 @@ uint32_t ESP8266UartComponent::get_config() {
|
||||
}
|
||||
|
||||
void ESP8266UartComponent::setup() {
|
||||
if (this->rx_pin_) {
|
||||
this->rx_pin_->setup();
|
||||
}
|
||||
if (this->tx_pin_ && this->rx_pin_ != this->tx_pin_) {
|
||||
this->tx_pin_->setup();
|
||||
auto setup_pin_if_needed = [](InternalGPIOPin *pin) {
|
||||
if (!pin) {
|
||||
return;
|
||||
}
|
||||
const auto mask = gpio::Flags::FLAG_OPEN_DRAIN | gpio::Flags::FLAG_PULLUP | gpio::Flags::FLAG_PULLDOWN;
|
||||
if ((pin->get_flags() & mask) != gpio::Flags::FLAG_NONE) {
|
||||
pin->setup();
|
||||
}
|
||||
};
|
||||
|
||||
setup_pin_if_needed(this->rx_pin_);
|
||||
if (this->rx_pin_ != this->tx_pin_) {
|
||||
setup_pin_if_needed(this->tx_pin_);
|
||||
}
|
||||
|
||||
// Use Arduino HardwareSerial UARTs if all used pins match the ones
|
||||
|
||||
@@ -133,11 +133,19 @@ void IDFUARTComponent::load_settings(bool dump_config) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this->rx_pin_) {
|
||||
this->rx_pin_->setup();
|
||||
}
|
||||
if (this->tx_pin_ && this->rx_pin_ != this->tx_pin_) {
|
||||
this->tx_pin_->setup();
|
||||
auto setup_pin_if_needed = [](InternalGPIOPin *pin) {
|
||||
if (!pin) {
|
||||
return;
|
||||
}
|
||||
const auto mask = gpio::Flags::FLAG_OPEN_DRAIN | gpio::Flags::FLAG_PULLUP | gpio::Flags::FLAG_PULLDOWN;
|
||||
if ((pin->get_flags() & mask) != gpio::Flags::FLAG_NONE) {
|
||||
pin->setup();
|
||||
}
|
||||
};
|
||||
|
||||
setup_pin_if_needed(this->rx_pin_);
|
||||
if (this->rx_pin_ != this->tx_pin_) {
|
||||
setup_pin_if_needed(this->tx_pin_);
|
||||
}
|
||||
|
||||
int8_t tx = this->tx_pin_ != nullptr ? this->tx_pin_->get_pin() : -1;
|
||||
|
||||
@@ -53,7 +53,7 @@ void LibreTinyUARTComponent::setup() {
|
||||
|
||||
auto shouldFallbackToSoftwareSerial = [&]() -> bool {
|
||||
auto hasFlags = [](InternalGPIOPin *pin, const gpio::Flags mask) -> bool {
|
||||
return pin && pin->get_flags() & mask != gpio::Flags::FLAG_NONE;
|
||||
return pin && (pin->get_flags() & mask) != gpio::Flags::FLAG_NONE;
|
||||
};
|
||||
if (hasFlags(this->tx_pin_, gpio::Flags::FLAG_OPEN_DRAIN | gpio::Flags::FLAG_PULLUP | gpio::Flags::FLAG_PULLDOWN) ||
|
||||
hasFlags(this->rx_pin_, gpio::Flags::FLAG_OPEN_DRAIN | gpio::Flags::FLAG_PULLUP | gpio::Flags::FLAG_PULLDOWN)) {
|
||||
|
||||
@@ -52,11 +52,19 @@ uint16_t RP2040UartComponent::get_config() {
|
||||
}
|
||||
|
||||
void RP2040UartComponent::setup() {
|
||||
if (this->rx_pin_) {
|
||||
this->rx_pin_->setup();
|
||||
}
|
||||
if (this->tx_pin_ && this->rx_pin_ != this->tx_pin_) {
|
||||
this->tx_pin_->setup();
|
||||
auto setup_pin_if_needed = [](InternalGPIOPin *pin) {
|
||||
if (!pin) {
|
||||
return;
|
||||
}
|
||||
const auto mask = gpio::Flags::FLAG_OPEN_DRAIN | gpio::Flags::FLAG_PULLUP | gpio::Flags::FLAG_PULLDOWN;
|
||||
if ((pin->get_flags() & mask) != gpio::Flags::FLAG_NONE) {
|
||||
pin->setup();
|
||||
}
|
||||
};
|
||||
|
||||
setup_pin_if_needed(this->rx_pin_);
|
||||
if (this->rx_pin_ != this->tx_pin_) {
|
||||
setup_pin_if_needed(this->tx_pin_);
|
||||
}
|
||||
|
||||
uint16_t config = get_config();
|
||||
|
||||
@@ -1,10 +1,17 @@
|
||||
import logging
|
||||
|
||||
import esphome.codegen as cg
|
||||
from esphome.components.esp32 import add_idf_component
|
||||
from esphome.components.ota import BASE_OTA_SCHEMA, OTAComponent, ota_to_code
|
||||
from esphome.config_helpers import merge_config
|
||||
import esphome.config_validation as cv
|
||||
from esphome.const import CONF_ID
|
||||
from esphome.const import CONF_ID, CONF_OTA, CONF_PLATFORM, CONF_WEB_SERVER
|
||||
from esphome.core import CORE, coroutine_with_priority
|
||||
from esphome.coroutine import CoroPriority
|
||||
import esphome.final_validate as fv
|
||||
from esphome.types import ConfigType
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
CODEOWNERS = ["@esphome/core"]
|
||||
DEPENDENCIES = ["network", "web_server_base"]
|
||||
@@ -12,6 +19,53 @@ DEPENDENCIES = ["network", "web_server_base"]
|
||||
web_server_ns = cg.esphome_ns.namespace("web_server")
|
||||
WebServerOTAComponent = web_server_ns.class_("WebServerOTAComponent", OTAComponent)
|
||||
|
||||
|
||||
def _web_server_ota_final_validate(config: ConfigType) -> None:
|
||||
"""Merge multiple web_server OTA instances into one.
|
||||
|
||||
Multiple web_server OTA instances register duplicate HTTP handlers for /update,
|
||||
causing undefined behavior. Merge them into a single instance.
|
||||
"""
|
||||
full_conf = fv.full_config.get()
|
||||
ota_confs = full_conf.get(CONF_OTA, [])
|
||||
|
||||
web_server_ota_configs: list[ConfigType] = []
|
||||
other_ota_configs: list[ConfigType] = []
|
||||
|
||||
for ota_conf in ota_confs:
|
||||
if ota_conf.get(CONF_PLATFORM) == CONF_WEB_SERVER:
|
||||
web_server_ota_configs.append(ota_conf)
|
||||
else:
|
||||
other_ota_configs.append(ota_conf)
|
||||
|
||||
if len(web_server_ota_configs) <= 1:
|
||||
return
|
||||
|
||||
# Merge all web_server OTA configs into the first one
|
||||
merged = web_server_ota_configs[0]
|
||||
for ota_conf in web_server_ota_configs[1:]:
|
||||
# Validate that IDs are consistent if manually specified
|
||||
if (
|
||||
merged[CONF_ID].is_manual
|
||||
and ota_conf[CONF_ID].is_manual
|
||||
and merged[CONF_ID] != ota_conf[CONF_ID]
|
||||
):
|
||||
raise cv.Invalid(
|
||||
f"Found multiple web_server OTA configurations but {CONF_ID} is inconsistent"
|
||||
)
|
||||
merged = merge_config(merged, ota_conf)
|
||||
|
||||
_LOGGER.warning(
|
||||
"Found and merged %d web_server OTA configurations into one instance",
|
||||
len(web_server_ota_configs),
|
||||
)
|
||||
|
||||
# Replace OTA configs with merged web_server + other OTA platforms
|
||||
other_ota_configs.append(merged)
|
||||
full_conf[CONF_OTA] = other_ota_configs
|
||||
fv.full_config.set(full_conf)
|
||||
|
||||
|
||||
CONFIG_SCHEMA = (
|
||||
cv.Schema(
|
||||
{
|
||||
@@ -22,6 +76,8 @@ CONFIG_SCHEMA = (
|
||||
.extend(cv.COMPONENT_SCHEMA)
|
||||
)
|
||||
|
||||
FINAL_VALIDATE_SCHEMA = _web_server_ota_final_validate
|
||||
|
||||
|
||||
@coroutine_with_priority(CoroPriority.WEB_SERVER_OTA)
|
||||
async def to_code(config):
|
||||
|
||||
@@ -489,10 +489,18 @@ AsyncEventSourceResponse::AsyncEventSourceResponse(const AsyncWebServerRequest *
|
||||
|
||||
void AsyncEventSourceResponse::destroy(void *ptr) {
|
||||
auto *rsp = static_cast<AsyncEventSourceResponse *>(ptr);
|
||||
ESP_LOGD(TAG, "Event source connection closed (fd: %d)", rsp->fd_.load());
|
||||
// Mark as dead by setting fd to 0 - will be cleaned up in the main loop
|
||||
rsp->fd_.store(0);
|
||||
// Note: We don't delete or remove from set here to avoid race conditions
|
||||
int fd = rsp->fd_.exchange(0); // Atomically get and clear fd
|
||||
|
||||
if (fd > 0) {
|
||||
ESP_LOGD(TAG, "Event source connection closed (fd: %d)", fd);
|
||||
// Immediately shut down the socket to prevent lwIP from delivering more data
|
||||
// This prevents "recv_tcp: recv for wrong pcb!" assertions when the TCP stack
|
||||
// tries to deliver queued data after the session is marked as dead
|
||||
// See: https://github.com/esphome/esphome/issues/11936
|
||||
shutdown(fd, SHUT_RDWR);
|
||||
// Note: We don't close() the socket - httpd owns it and will close it
|
||||
}
|
||||
// Session will be cleaned up in the main loop to avoid race conditions
|
||||
}
|
||||
|
||||
// helper for allowing only unique entries in the queue
|
||||
|
||||
@@ -479,14 +479,11 @@ async def to_code(config):
|
||||
cg.add(var.set_min_auth_mode(config[CONF_MIN_AUTH_MODE]))
|
||||
if config[CONF_FAST_CONNECT]:
|
||||
cg.add_define("USE_WIFI_FAST_CONNECT")
|
||||
# passive_scan defaults to false in C++ - only set if true
|
||||
if config[CONF_PASSIVE_SCAN]:
|
||||
cg.add(var.set_passive_scan(True))
|
||||
cg.add(var.set_passive_scan(config[CONF_PASSIVE_SCAN]))
|
||||
if CONF_OUTPUT_POWER in config:
|
||||
cg.add(var.set_output_power(config[CONF_OUTPUT_POWER]))
|
||||
# enable_on_boot defaults to true in C++ - only set if false
|
||||
if not config[CONF_ENABLE_ON_BOOT]:
|
||||
cg.add(var.set_enable_on_boot(False))
|
||||
|
||||
cg.add(var.set_enable_on_boot(config[CONF_ENABLE_ON_BOOT]))
|
||||
|
||||
if CORE.is_esp8266:
|
||||
cg.add_library("ESP8266WiFi", None)
|
||||
|
||||
@@ -526,7 +526,7 @@ class WiFiComponent : public Component {
|
||||
bool btm_{false};
|
||||
bool rrm_{false};
|
||||
#endif
|
||||
bool enable_on_boot_{true};
|
||||
bool enable_on_boot_;
|
||||
bool got_ipv4_address_{false};
|
||||
bool keep_scan_results_{false};
|
||||
|
||||
|
||||
@@ -1,6 +1,18 @@
|
||||
"""Tests for the web_server OTA platform."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from esphome import config_validation as cv
|
||||
from esphome.components.web_server.ota import _web_server_ota_final_validate
|
||||
from esphome.const import CONF_ID, CONF_OTA, CONF_PLATFORM, CONF_WEB_SERVER
|
||||
from esphome.core import ID
|
||||
import esphome.final_validate as fv
|
||||
|
||||
|
||||
def test_web_server_ota_generated(generate_main: Callable[[str], str]) -> None:
|
||||
@@ -100,3 +112,144 @@ def test_web_server_ota_esp8266(generate_main: Callable[[str], str]) -> None:
|
||||
# Check web server OTA component is present
|
||||
assert "WebServerOTAComponent" in main_cpp
|
||||
assert "web_server::WebServerOTAComponent" in main_cpp
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("ota_configs", "expected_count", "warning_expected"),
|
||||
[
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
CONF_PLATFORM: CONF_WEB_SERVER,
|
||||
CONF_ID: ID("ota_web", is_manual=False),
|
||||
}
|
||||
],
|
||||
1,
|
||||
False,
|
||||
id="single_instance_no_merge",
|
||||
),
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
CONF_PLATFORM: CONF_WEB_SERVER,
|
||||
CONF_ID: ID("ota_web_1", is_manual=False),
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_WEB_SERVER,
|
||||
CONF_ID: ID("ota_web_2", is_manual=False),
|
||||
},
|
||||
],
|
||||
1,
|
||||
True,
|
||||
id="two_instances_merged",
|
||||
),
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
CONF_PLATFORM: CONF_WEB_SERVER,
|
||||
CONF_ID: ID("ota_web_1", is_manual=False),
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: "esphome",
|
||||
CONF_ID: ID("ota_esphome", is_manual=False),
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_WEB_SERVER,
|
||||
CONF_ID: ID("ota_web_2", is_manual=False),
|
||||
},
|
||||
],
|
||||
2,
|
||||
True,
|
||||
id="mixed_platforms_web_server_merged",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_web_server_ota_instance_merging(
|
||||
ota_configs: list[dict[str, Any]],
|
||||
expected_count: int,
|
||||
warning_expected: bool,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test web_server OTA instance merging behavior."""
|
||||
full_conf = {CONF_OTA: ota_configs.copy()}
|
||||
|
||||
token = fv.full_config.set(full_conf)
|
||||
try:
|
||||
with caplog.at_level(logging.WARNING):
|
||||
_web_server_ota_final_validate({})
|
||||
|
||||
updated_conf = fv.full_config.get()
|
||||
|
||||
# Verify total number of OTA platforms
|
||||
assert len(updated_conf[CONF_OTA]) == expected_count
|
||||
|
||||
# Verify warning
|
||||
if warning_expected:
|
||||
assert any(
|
||||
"Found and merged" in record.message
|
||||
and "web_server OTA" in record.message
|
||||
for record in caplog.records
|
||||
), "Expected merge warning not found in log"
|
||||
else:
|
||||
assert len(caplog.records) == 0, "Unexpected warnings logged"
|
||||
finally:
|
||||
fv.full_config.reset(token)
|
||||
|
||||
|
||||
def test_web_server_ota_consistent_manual_ids(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test that consistent manual IDs can be merged successfully."""
|
||||
ota_configs = [
|
||||
{
|
||||
CONF_PLATFORM: CONF_WEB_SERVER,
|
||||
CONF_ID: ID("ota_web", is_manual=True),
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_WEB_SERVER,
|
||||
CONF_ID: ID("ota_web", is_manual=True),
|
||||
},
|
||||
]
|
||||
|
||||
full_conf = {CONF_OTA: ota_configs}
|
||||
|
||||
token = fv.full_config.set(full_conf)
|
||||
try:
|
||||
with caplog.at_level(logging.WARNING):
|
||||
_web_server_ota_final_validate({})
|
||||
|
||||
updated_conf = fv.full_config.get()
|
||||
assert len(updated_conf[CONF_OTA]) == 1
|
||||
assert updated_conf[CONF_OTA][0][CONF_ID].id == "ota_web"
|
||||
assert any(
|
||||
"Found and merged" in record.message and "web_server OTA" in record.message
|
||||
for record in caplog.records
|
||||
)
|
||||
finally:
|
||||
fv.full_config.reset(token)
|
||||
|
||||
|
||||
def test_web_server_ota_inconsistent_manual_ids() -> None:
|
||||
"""Test that inconsistent manual IDs raise an error."""
|
||||
ota_configs = [
|
||||
{
|
||||
CONF_PLATFORM: CONF_WEB_SERVER,
|
||||
CONF_ID: ID("ota_web_1", is_manual=True),
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_WEB_SERVER,
|
||||
CONF_ID: ID("ota_web_2", is_manual=True),
|
||||
},
|
||||
]
|
||||
|
||||
full_conf = {CONF_OTA: ota_configs}
|
||||
|
||||
token = fv.full_config.set(full_conf)
|
||||
try:
|
||||
with pytest.raises(
|
||||
cv.Invalid,
|
||||
match="Found multiple web_server OTA configurations but id is inconsistent",
|
||||
):
|
||||
_web_server_ota_final_validate({})
|
||||
finally:
|
||||
fv.full_config.reset(token)
|
||||
|
||||
1
tests/component_tests/sntp/__init__.py
Normal file
1
tests/component_tests/sntp/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Tests for SNTP component."""
|
||||
22
tests/component_tests/sntp/config/sntp_test.yaml
Normal file
22
tests/component_tests/sntp/config/sntp_test.yaml
Normal file
@@ -0,0 +1,22 @@
|
||||
esphome:
|
||||
name: sntp-test
|
||||
|
||||
esp32:
|
||||
board: esp32dev
|
||||
framework:
|
||||
type: esp-idf
|
||||
|
||||
wifi:
|
||||
ssid: "testssid"
|
||||
password: "testpassword"
|
||||
|
||||
# Test multiple SNTP instances that should be merged
|
||||
time:
|
||||
- platform: sntp
|
||||
servers:
|
||||
- 192.168.1.1
|
||||
- pool.ntp.org
|
||||
- platform: sntp
|
||||
servers:
|
||||
- pool.ntp.org
|
||||
- 192.168.1.2
|
||||
238
tests/component_tests/sntp/test_init.py
Normal file
238
tests/component_tests/sntp/test_init.py
Normal file
@@ -0,0 +1,238 @@
|
||||
"""Tests for SNTP time configuration validation."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from esphome import config_validation as cv
|
||||
from esphome.components.sntp.time import CONF_SNTP, _sntp_final_validate
|
||||
from esphome.const import CONF_ID, CONF_PLATFORM, CONF_SERVERS, CONF_TIME
|
||||
from esphome.core import ID
|
||||
import esphome.final_validate as fv
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("time_configs", "expected_count", "expected_servers", "warning_messages"),
|
||||
[
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.1", "pool.ntp.org"],
|
||||
}
|
||||
],
|
||||
1,
|
||||
["192.168.1.1", "pool.ntp.org"],
|
||||
[],
|
||||
id="single_instance_no_merge",
|
||||
),
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_1", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.1", "pool.ntp.org"],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_2", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.2"],
|
||||
},
|
||||
],
|
||||
1,
|
||||
["192.168.1.1", "pool.ntp.org", "192.168.1.2"],
|
||||
["Found and merged 2 SNTP time configurations into one instance"],
|
||||
id="two_instances_merged",
|
||||
),
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_1", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.1", "pool.ntp.org"],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_2", is_manual=False),
|
||||
CONF_SERVERS: ["pool.ntp.org", "192.168.1.2"],
|
||||
},
|
||||
],
|
||||
1,
|
||||
["192.168.1.1", "pool.ntp.org", "192.168.1.2"],
|
||||
["Found and merged 2 SNTP time configurations into one instance"],
|
||||
id="deduplication_preserves_order",
|
||||
),
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_1", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.1", "pool.ntp.org"],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_2", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.2", "pool2.ntp.org"],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_3", is_manual=False),
|
||||
CONF_SERVERS: ["pool3.ntp.org"],
|
||||
},
|
||||
],
|
||||
1,
|
||||
["192.168.1.1", "pool.ntp.org", "192.168.1.2"],
|
||||
[
|
||||
"SNTP supports maximum 3 servers. Dropped excess server(s): ['pool2.ntp.org', 'pool3.ntp.org']",
|
||||
"Found and merged 3 SNTP time configurations into one instance",
|
||||
],
|
||||
id="three_instances_drops_excess_servers",
|
||||
),
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_1", is_manual=False),
|
||||
CONF_SERVERS: [
|
||||
"192.168.1.1",
|
||||
"pool.ntp.org",
|
||||
"pool.ntp.org",
|
||||
"192.168.1.1",
|
||||
],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_2", is_manual=False),
|
||||
CONF_SERVERS: ["pool.ntp.org", "192.168.1.2"],
|
||||
},
|
||||
],
|
||||
1,
|
||||
["192.168.1.1", "pool.ntp.org", "192.168.1.2"],
|
||||
["Found and merged 2 SNTP time configurations into one instance"],
|
||||
id="deduplication_multiple_duplicates",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_sntp_instance_merging(
|
||||
time_configs: list[dict[str, Any]],
|
||||
expected_count: int,
|
||||
expected_servers: list[str],
|
||||
warning_messages: list[str],
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test SNTP instance merging behavior."""
|
||||
# Create a mock full config with time configs
|
||||
full_conf = {CONF_TIME: time_configs.copy()}
|
||||
|
||||
# Set the context var
|
||||
token = fv.full_config.set(full_conf)
|
||||
try:
|
||||
with caplog.at_level(logging.WARNING):
|
||||
_sntp_final_validate({})
|
||||
|
||||
# Get the updated config
|
||||
updated_conf = fv.full_config.get()
|
||||
|
||||
# Check if merging occurred
|
||||
if len(time_configs) > 1:
|
||||
# Verify only one SNTP instance remains
|
||||
sntp_instances = [
|
||||
tc
|
||||
for tc in updated_conf[CONF_TIME]
|
||||
if tc.get(CONF_PLATFORM) == CONF_SNTP
|
||||
]
|
||||
assert len(sntp_instances) == expected_count
|
||||
|
||||
# Verify server list
|
||||
assert sntp_instances[0][CONF_SERVERS] == expected_servers
|
||||
|
||||
# Verify warnings
|
||||
for expected_msg in warning_messages:
|
||||
assert any(
|
||||
expected_msg in record.message for record in caplog.records
|
||||
), f"Expected warning message '{expected_msg}' not found in log"
|
||||
else:
|
||||
# Single instance should not trigger merging or warnings
|
||||
assert len(caplog.records) == 0
|
||||
# Config should be unchanged
|
||||
assert updated_conf[CONF_TIME] == time_configs
|
||||
finally:
|
||||
fv.full_config.reset(token)
|
||||
|
||||
|
||||
def test_sntp_inconsistent_manual_ids() -> None:
|
||||
"""Test that inconsistent manual IDs raise an error."""
|
||||
# Create configs with manual IDs that are inconsistent
|
||||
time_configs = [
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_1", is_manual=True),
|
||||
CONF_SERVERS: ["192.168.1.1"],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_2", is_manual=True),
|
||||
CONF_SERVERS: ["192.168.1.2"],
|
||||
},
|
||||
]
|
||||
|
||||
full_conf = {CONF_TIME: time_configs}
|
||||
|
||||
token = fv.full_config.set(full_conf)
|
||||
try:
|
||||
with pytest.raises(
|
||||
cv.Invalid,
|
||||
match="Found multiple SNTP configurations but id is inconsistent",
|
||||
):
|
||||
_sntp_final_validate({})
|
||||
finally:
|
||||
fv.full_config.reset(token)
|
||||
|
||||
|
||||
def test_sntp_with_other_time_platforms(caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""Test that SNTP merging doesn't affect other time platforms."""
|
||||
time_configs = [
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_1", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.1"],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: "homeassistant",
|
||||
CONF_ID: ID("homeassistant_time", is_manual=False),
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_2", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.2"],
|
||||
},
|
||||
]
|
||||
|
||||
full_conf = {CONF_TIME: time_configs.copy()}
|
||||
|
||||
token = fv.full_config.set(full_conf)
|
||||
try:
|
||||
with caplog.at_level(logging.WARNING):
|
||||
_sntp_final_validate({})
|
||||
|
||||
updated_conf = fv.full_config.get()
|
||||
|
||||
# Should have 2 time platforms: 1 merged SNTP + 1 homeassistant
|
||||
assert len(updated_conf[CONF_TIME]) == 2
|
||||
|
||||
# Find the platforms
|
||||
platforms = {tc[CONF_PLATFORM] for tc in updated_conf[CONF_TIME]}
|
||||
assert platforms == {CONF_SNTP, "homeassistant"}
|
||||
|
||||
# Verify SNTP was merged
|
||||
sntp_instances = [
|
||||
tc for tc in updated_conf[CONF_TIME] if tc[CONF_PLATFORM] == CONF_SNTP
|
||||
]
|
||||
assert len(sntp_instances) == 1
|
||||
assert sntp_instances[0][CONF_SERVERS] == ["192.168.1.1", "192.168.1.2"]
|
||||
finally:
|
||||
fv.full_config.reset(token)
|
||||
Reference in New Issue
Block a user