mirror of
https://github.com/esphome/esphome.git
synced 2025-11-15 14:25:45 +00:00
Compare commits
1 Commits
delay_acti
...
sntp_singl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e8f2e91db3 |
4
.github/workflows/codeql.yml
vendored
4
.github/workflows/codeql.yml
vendored
@@ -58,7 +58,7 @@ jobs:
|
||||
|
||||
# Initializes the CodeQL tools for scanning.
|
||||
- name: Initialize CodeQL
|
||||
uses: github/codeql-action/init@014f16e7ab1402f30e7c3329d33797e7948572db # v4.31.3
|
||||
uses: github/codeql-action/init@0499de31b99561a6d14a36a5f662c2a54f91beee # v4.31.2
|
||||
with:
|
||||
languages: ${{ matrix.language }}
|
||||
build-mode: ${{ matrix.build-mode }}
|
||||
@@ -86,6 +86,6 @@ jobs:
|
||||
exit 1
|
||||
|
||||
- name: Perform CodeQL Analysis
|
||||
uses: github/codeql-action/analyze@014f16e7ab1402f30e7c3329d33797e7948572db # v4.31.3
|
||||
uses: github/codeql-action/analyze@0499de31b99561a6d14a36a5f662c2a54f91beee # v4.31.2
|
||||
with:
|
||||
category: "/language:${{matrix.language}}"
|
||||
|
||||
@@ -11,7 +11,7 @@ ci:
|
||||
repos:
|
||||
- repo: https://github.com/astral-sh/ruff-pre-commit
|
||||
# Ruff version.
|
||||
rev: v0.14.5
|
||||
rev: v0.14.4
|
||||
hooks:
|
||||
# Run the linter.
|
||||
- id: ruff
|
||||
|
||||
@@ -1,9 +1,14 @@
|
||||
import logging
|
||||
|
||||
import esphome.codegen as cg
|
||||
from esphome.components import time as time_
|
||||
from esphome.config_helpers import merge_config
|
||||
import esphome.config_validation as cv
|
||||
from esphome.const import (
|
||||
CONF_ID,
|
||||
CONF_PLATFORM,
|
||||
CONF_SERVERS,
|
||||
CONF_TIME,
|
||||
PLATFORM_BK72XX,
|
||||
PLATFORM_ESP32,
|
||||
PLATFORM_ESP8266,
|
||||
@@ -12,13 +17,74 @@ from esphome.const import (
|
||||
PLATFORM_RTL87XX,
|
||||
)
|
||||
from esphome.core import CORE
|
||||
import esphome.final_validate as fv
|
||||
from esphome.types import ConfigType
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
DEPENDENCIES = ["network"]
|
||||
|
||||
CONF_SNTP = "sntp"
|
||||
|
||||
sntp_ns = cg.esphome_ns.namespace("sntp")
|
||||
SNTPComponent = sntp_ns.class_("SNTPComponent", time_.RealTimeClock)
|
||||
|
||||
DEFAULT_SERVERS = ["0.pool.ntp.org", "1.pool.ntp.org", "2.pool.ntp.org"]
|
||||
|
||||
|
||||
def _sntp_final_validate(config: ConfigType) -> None:
|
||||
"""Merge multiple SNTP instances into one, similar to OTA merging behavior."""
|
||||
full_conf = fv.full_config.get()
|
||||
time_confs = full_conf.get(CONF_TIME, [])
|
||||
|
||||
sntp_configs: list[ConfigType] = []
|
||||
other_time_configs: list[ConfigType] = []
|
||||
|
||||
for time_conf in time_confs:
|
||||
if time_conf.get(CONF_PLATFORM) == CONF_SNTP:
|
||||
sntp_configs.append(time_conf)
|
||||
else:
|
||||
other_time_configs.append(time_conf)
|
||||
|
||||
if len(sntp_configs) <= 1:
|
||||
return
|
||||
|
||||
# Merge all SNTP configs into the first one
|
||||
merged = sntp_configs[0]
|
||||
for sntp_conf in sntp_configs[1:]:
|
||||
# Validate that IDs are consistent if manually specified
|
||||
if merged[CONF_ID].is_manual and sntp_conf[CONF_ID].is_manual:
|
||||
raise cv.Invalid(
|
||||
f"Found multiple SNTP configurations but {CONF_ID} is inconsistent"
|
||||
)
|
||||
merged = merge_config(merged, sntp_conf)
|
||||
|
||||
# Deduplicate servers while preserving order
|
||||
servers = merged[CONF_SERVERS]
|
||||
unique_servers = list(dict.fromkeys(servers))
|
||||
|
||||
# Warn if we're dropping servers due to 3-server limit
|
||||
if len(unique_servers) > 3:
|
||||
dropped = unique_servers[3:]
|
||||
unique_servers = unique_servers[:3]
|
||||
_LOGGER.warning(
|
||||
"SNTP supports maximum 3 servers. Dropped excess server(s): %s",
|
||||
dropped,
|
||||
)
|
||||
|
||||
merged[CONF_SERVERS] = unique_servers
|
||||
|
||||
_LOGGER.warning(
|
||||
"Found and merged %d SNTP time configurations into one instance",
|
||||
len(sntp_configs),
|
||||
)
|
||||
|
||||
# Replace time configs with merged SNTP + other time platforms
|
||||
other_time_configs.append(merged)
|
||||
full_conf[CONF_TIME] = other_time_configs
|
||||
fv.full_config.set(full_conf)
|
||||
|
||||
|
||||
CONFIG_SCHEMA = cv.All(
|
||||
time_.TIME_SCHEMA.extend(
|
||||
{
|
||||
@@ -40,6 +106,8 @@ CONFIG_SCHEMA = cv.All(
|
||||
),
|
||||
)
|
||||
|
||||
FINAL_VALIDATE_SCHEMA = _sntp_final_validate
|
||||
|
||||
|
||||
async def to_code(config):
|
||||
servers = config[CONF_SERVERS]
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
from logging import getLogger
|
||||
import math
|
||||
import re
|
||||
|
||||
@@ -36,8 +35,6 @@ from esphome.core import CORE, ID
|
||||
import esphome.final_validate as fv
|
||||
from esphome.yaml_util import make_data_base
|
||||
|
||||
_LOGGER = getLogger(__name__)
|
||||
|
||||
CODEOWNERS = ["@esphome/core"]
|
||||
uart_ns = cg.esphome_ns.namespace("uart")
|
||||
UARTComponent = uart_ns.class_("UARTComponent")
|
||||
@@ -133,21 +130,6 @@ def validate_host_config(config):
|
||||
return config
|
||||
|
||||
|
||||
def validate_rx_buffer_size(config):
|
||||
if CORE.is_esp32:
|
||||
# ESP32 UART hardware FIFO is 128 bytes (LP UART is 16 bytes, but we use 128 as safe minimum)
|
||||
# rx_buffer_size must be greater than the hardware FIFO length
|
||||
min_buffer_size = 128
|
||||
if config[CONF_RX_BUFFER_SIZE] <= min_buffer_size:
|
||||
_LOGGER.warning(
|
||||
"UART rx_buffer_size (%d bytes) is too small and must be greater than the hardware "
|
||||
"FIFO size (%d bytes). The buffer size will be automatically adjusted at runtime.",
|
||||
config[CONF_RX_BUFFER_SIZE],
|
||||
min_buffer_size,
|
||||
)
|
||||
return config
|
||||
|
||||
|
||||
def _uart_declare_type(value):
|
||||
if CORE.is_esp8266:
|
||||
return cv.declare_id(ESP8266UartComponent)(value)
|
||||
@@ -265,7 +247,6 @@ CONFIG_SCHEMA = cv.All(
|
||||
).extend(cv.COMPONENT_SCHEMA),
|
||||
cv.has_at_least_one_key(CONF_TX_PIN, CONF_RX_PIN, CONF_PORT),
|
||||
validate_host_config,
|
||||
validate_rx_buffer_size,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -91,16 +91,6 @@ void IDFUARTComponent::setup() {
|
||||
this->uart_num_ = static_cast<uart_port_t>(next_uart_num++);
|
||||
this->lock_ = xSemaphoreCreateMutex();
|
||||
|
||||
#if (SOC_UART_LP_NUM >= 1)
|
||||
size_t fifo_len = ((this->uart_num_ < SOC_UART_HP_NUM) ? SOC_UART_FIFO_LEN : SOC_LP_UART_FIFO_LEN);
|
||||
#else
|
||||
size_t fifo_len = SOC_UART_FIFO_LEN;
|
||||
#endif
|
||||
if (this->rx_buffer_size_ <= fifo_len) {
|
||||
ESP_LOGW(TAG, "rx_buffer_size is too small, must be greater than %zu", fifo_len);
|
||||
this->rx_buffer_size_ = fifo_len * 2;
|
||||
}
|
||||
|
||||
xSemaphoreTake(this->lock_, portMAX_DELAY);
|
||||
|
||||
this->load_settings(false);
|
||||
@@ -247,12 +237,8 @@ void IDFUARTComponent::set_rx_timeout(size_t rx_timeout) {
|
||||
|
||||
void IDFUARTComponent::write_array(const uint8_t *data, size_t len) {
|
||||
xSemaphoreTake(this->lock_, portMAX_DELAY);
|
||||
int32_t write_len = uart_write_bytes(this->uart_num_, data, len);
|
||||
uart_write_bytes(this->uart_num_, data, len);
|
||||
xSemaphoreGive(this->lock_);
|
||||
if (write_len != (int32_t) len) {
|
||||
ESP_LOGW(TAG, "uart_write_bytes failed: %d != %zu", write_len, len);
|
||||
this->mark_failed();
|
||||
}
|
||||
#ifdef USE_UART_DEBUGGER
|
||||
for (size_t i = 0; i < len; i++) {
|
||||
this->debug_callback_.call(UART_DIRECTION_TX, data[i]);
|
||||
@@ -281,7 +267,6 @@ bool IDFUARTComponent::peek_byte(uint8_t *data) {
|
||||
|
||||
bool IDFUARTComponent::read_array(uint8_t *data, size_t len) {
|
||||
size_t length_to_read = len;
|
||||
int32_t read_len = 0;
|
||||
if (!this->check_read_timeout_(len))
|
||||
return false;
|
||||
xSemaphoreTake(this->lock_, portMAX_DELAY);
|
||||
@@ -292,31 +277,25 @@ bool IDFUARTComponent::read_array(uint8_t *data, size_t len) {
|
||||
this->has_peek_ = false;
|
||||
}
|
||||
if (length_to_read > 0)
|
||||
read_len = uart_read_bytes(this->uart_num_, data, length_to_read, 20 / portTICK_PERIOD_MS);
|
||||
uart_read_bytes(this->uart_num_, data, length_to_read, 20 / portTICK_PERIOD_MS);
|
||||
xSemaphoreGive(this->lock_);
|
||||
#ifdef USE_UART_DEBUGGER
|
||||
for (size_t i = 0; i < len; i++) {
|
||||
this->debug_callback_.call(UART_DIRECTION_RX, data[i]);
|
||||
}
|
||||
#endif
|
||||
return read_len == (int32_t) length_to_read;
|
||||
return true;
|
||||
}
|
||||
|
||||
int IDFUARTComponent::available() {
|
||||
size_t available = 0;
|
||||
esp_err_t err;
|
||||
size_t available;
|
||||
|
||||
xSemaphoreTake(this->lock_, portMAX_DELAY);
|
||||
err = uart_get_buffered_data_len(this->uart_num_, &available);
|
||||
uart_get_buffered_data_len(this->uart_num_, &available);
|
||||
if (this->has_peek_)
|
||||
available++;
|
||||
xSemaphoreGive(this->lock_);
|
||||
|
||||
if (err != ESP_OK) {
|
||||
ESP_LOGW(TAG, "uart_get_buffered_data_len failed: %s", esp_err_to_name(err));
|
||||
this->mark_failed();
|
||||
}
|
||||
if (this->has_peek_) {
|
||||
available++;
|
||||
}
|
||||
return available;
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ from esphome.components.network import (
|
||||
from esphome.components.psram import is_guaranteed as psram_is_guaranteed
|
||||
from esphome.config_helpers import filter_source_files_from_platform
|
||||
import esphome.config_validation as cv
|
||||
from esphome.config_validation import only_with_esp_idf
|
||||
from esphome.const import (
|
||||
CONF_AP,
|
||||
CONF_BSSID,
|
||||
@@ -351,7 +352,7 @@ CONFIG_SCHEMA = cv.All(
|
||||
single=True
|
||||
),
|
||||
cv.Optional(CONF_USE_PSRAM): cv.All(
|
||||
cv.only_on_esp32, cv.requires_component("psram"), cv.boolean
|
||||
only_with_esp_idf, cv.requires_component("psram"), cv.boolean
|
||||
),
|
||||
}
|
||||
),
|
||||
|
||||
@@ -178,6 +178,7 @@ template<typename... Ts> class DelayAction : public Action<Ts...>, public Compon
|
||||
TEMPLATABLE_VALUE(uint32_t, delay)
|
||||
|
||||
void play_complex(const Ts &...x) override {
|
||||
auto f = std::bind(&DelayAction<Ts...>::play_next_, this, x...);
|
||||
this->num_running_++;
|
||||
|
||||
// If num_running_ > 1, we have multiple instances running in parallel
|
||||
@@ -186,22 +187,9 @@ template<typename... Ts> class DelayAction : public Action<Ts...>, public Compon
|
||||
// WARNING: This can accumulate delays if scripts are triggered faster than they complete!
|
||||
// Users should set max_runs on parallel scripts to limit concurrent executions.
|
||||
// Issue #10264: This is a workaround for parallel script delays interfering with each other.
|
||||
|
||||
// Optimization: For no-argument delays (most common case), use direct lambda
|
||||
// instead of std::bind to avoid bind overhead (~16 bytes heap + faster execution)
|
||||
if constexpr (sizeof...(Ts) == 0) {
|
||||
App.scheduler.set_timer_common_(
|
||||
this, Scheduler::SchedulerItem::TIMEOUT,
|
||||
/* is_static_string= */ true, "delay", this->delay_.value(), [this]() { this->play_next_(); },
|
||||
/* is_retry= */ false, /* skip_cancel= */ this->num_running_ > 1);
|
||||
} else {
|
||||
// For delays with arguments, use std::bind to preserve argument values
|
||||
// Arguments must be copied because original references may be invalid after delay
|
||||
auto f = std::bind(&DelayAction<Ts...>::play_next_, this, x...);
|
||||
App.scheduler.set_timer_common_(this, Scheduler::SchedulerItem::TIMEOUT,
|
||||
/* is_static_string= */ true, "delay", this->delay_.value(x...), std::move(f),
|
||||
/* is_retry= */ false, /* skip_cancel= */ this->num_running_ > 1);
|
||||
}
|
||||
App.scheduler.set_timer_common_(this, Scheduler::SchedulerItem::TIMEOUT,
|
||||
/* is_static_string= */ true, "delay", this->delay_.value(x...), std::move(f),
|
||||
/* is_retry= */ false, /* skip_cancel= */ this->num_running_ > 1);
|
||||
}
|
||||
float get_setup_priority() const override { return setup_priority::HARDWARE; }
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
pylint==4.0.3
|
||||
flake8==7.3.0 # also change in .pre-commit-config.yaml when updating
|
||||
ruff==0.14.5 # also change in .pre-commit-config.yaml when updating
|
||||
ruff==0.14.4 # also change in .pre-commit-config.yaml when updating
|
||||
pyupgrade==3.21.1 # also change in .pre-commit-config.yaml when updating
|
||||
pre-commit
|
||||
|
||||
|
||||
1
tests/component_tests/sntp/__init__.py
Normal file
1
tests/component_tests/sntp/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Tests for SNTP component."""
|
||||
22
tests/component_tests/sntp/config/sntp_test.yaml
Normal file
22
tests/component_tests/sntp/config/sntp_test.yaml
Normal file
@@ -0,0 +1,22 @@
|
||||
esphome:
|
||||
name: sntp-test
|
||||
|
||||
esp32:
|
||||
board: esp32dev
|
||||
framework:
|
||||
type: esp-idf
|
||||
|
||||
wifi:
|
||||
ssid: "testssid"
|
||||
password: "testpassword"
|
||||
|
||||
# Test multiple SNTP instances that should be merged
|
||||
time:
|
||||
- platform: sntp
|
||||
servers:
|
||||
- 192.168.1.1
|
||||
- pool.ntp.org
|
||||
- platform: sntp
|
||||
servers:
|
||||
- pool.ntp.org
|
||||
- 192.168.1.2
|
||||
238
tests/component_tests/sntp/test_init.py
Normal file
238
tests/component_tests/sntp/test_init.py
Normal file
@@ -0,0 +1,238 @@
|
||||
"""Tests for SNTP time configuration validation."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from esphome import config_validation as cv
|
||||
from esphome.components.sntp.time import CONF_SNTP, _sntp_final_validate
|
||||
from esphome.const import CONF_ID, CONF_PLATFORM, CONF_SERVERS, CONF_TIME
|
||||
from esphome.core import ID
|
||||
import esphome.final_validate as fv
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("time_configs", "expected_count", "expected_servers", "warning_messages"),
|
||||
[
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.1", "pool.ntp.org"],
|
||||
}
|
||||
],
|
||||
1,
|
||||
["192.168.1.1", "pool.ntp.org"],
|
||||
[],
|
||||
id="single_instance_no_merge",
|
||||
),
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_1", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.1", "pool.ntp.org"],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_2", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.2"],
|
||||
},
|
||||
],
|
||||
1,
|
||||
["192.168.1.1", "pool.ntp.org", "192.168.1.2"],
|
||||
["Found and merged 2 SNTP time configurations into one instance"],
|
||||
id="two_instances_merged",
|
||||
),
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_1", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.1", "pool.ntp.org"],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_2", is_manual=False),
|
||||
CONF_SERVERS: ["pool.ntp.org", "192.168.1.2"],
|
||||
},
|
||||
],
|
||||
1,
|
||||
["192.168.1.1", "pool.ntp.org", "192.168.1.2"],
|
||||
["Found and merged 2 SNTP time configurations into one instance"],
|
||||
id="deduplication_preserves_order",
|
||||
),
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_1", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.1", "pool.ntp.org"],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_2", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.2", "pool2.ntp.org"],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_3", is_manual=False),
|
||||
CONF_SERVERS: ["pool3.ntp.org"],
|
||||
},
|
||||
],
|
||||
1,
|
||||
["192.168.1.1", "pool.ntp.org", "192.168.1.2"],
|
||||
[
|
||||
"SNTP supports maximum 3 servers. Dropped excess server(s): ['pool2.ntp.org', 'pool3.ntp.org']",
|
||||
"Found and merged 3 SNTP time configurations into one instance",
|
||||
],
|
||||
id="three_instances_drops_excess_servers",
|
||||
),
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_1", is_manual=False),
|
||||
CONF_SERVERS: [
|
||||
"192.168.1.1",
|
||||
"pool.ntp.org",
|
||||
"pool.ntp.org",
|
||||
"192.168.1.1",
|
||||
],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_2", is_manual=False),
|
||||
CONF_SERVERS: ["pool.ntp.org", "192.168.1.2"],
|
||||
},
|
||||
],
|
||||
1,
|
||||
["192.168.1.1", "pool.ntp.org", "192.168.1.2"],
|
||||
["Found and merged 2 SNTP time configurations into one instance"],
|
||||
id="deduplication_multiple_duplicates",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_sntp_instance_merging(
|
||||
time_configs: list[dict[str, Any]],
|
||||
expected_count: int,
|
||||
expected_servers: list[str],
|
||||
warning_messages: list[str],
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test SNTP instance merging behavior."""
|
||||
# Create a mock full config with time configs
|
||||
full_conf = {CONF_TIME: time_configs.copy()}
|
||||
|
||||
# Set the context var
|
||||
token = fv.full_config.set(full_conf)
|
||||
try:
|
||||
with caplog.at_level(logging.WARNING):
|
||||
_sntp_final_validate({})
|
||||
|
||||
# Get the updated config
|
||||
updated_conf = fv.full_config.get()
|
||||
|
||||
# Check if merging occurred
|
||||
if len(time_configs) > 1:
|
||||
# Verify only one SNTP instance remains
|
||||
sntp_instances = [
|
||||
tc
|
||||
for tc in updated_conf[CONF_TIME]
|
||||
if tc.get(CONF_PLATFORM) == CONF_SNTP
|
||||
]
|
||||
assert len(sntp_instances) == expected_count
|
||||
|
||||
# Verify server list
|
||||
assert sntp_instances[0][CONF_SERVERS] == expected_servers
|
||||
|
||||
# Verify warnings
|
||||
for expected_msg in warning_messages:
|
||||
assert any(
|
||||
expected_msg in record.message for record in caplog.records
|
||||
), f"Expected warning message '{expected_msg}' not found in log"
|
||||
else:
|
||||
# Single instance should not trigger merging or warnings
|
||||
assert len(caplog.records) == 0
|
||||
# Config should be unchanged
|
||||
assert updated_conf[CONF_TIME] == time_configs
|
||||
finally:
|
||||
fv.full_config.reset(token)
|
||||
|
||||
|
||||
def test_sntp_inconsistent_manual_ids() -> None:
|
||||
"""Test that inconsistent manual IDs raise an error."""
|
||||
# Create configs with manual IDs that are inconsistent
|
||||
time_configs = [
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_1", is_manual=True),
|
||||
CONF_SERVERS: ["192.168.1.1"],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_2", is_manual=True),
|
||||
CONF_SERVERS: ["192.168.1.2"],
|
||||
},
|
||||
]
|
||||
|
||||
full_conf = {CONF_TIME: time_configs}
|
||||
|
||||
token = fv.full_config.set(full_conf)
|
||||
try:
|
||||
with pytest.raises(
|
||||
cv.Invalid,
|
||||
match="Found multiple SNTP configurations but id is inconsistent",
|
||||
):
|
||||
_sntp_final_validate({})
|
||||
finally:
|
||||
fv.full_config.reset(token)
|
||||
|
||||
|
||||
def test_sntp_with_other_time_platforms(caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""Test that SNTP merging doesn't affect other time platforms."""
|
||||
time_configs = [
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_1", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.1"],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: "homeassistant",
|
||||
CONF_ID: ID("homeassistant_time", is_manual=False),
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_2", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.2"],
|
||||
},
|
||||
]
|
||||
|
||||
full_conf = {CONF_TIME: time_configs.copy()}
|
||||
|
||||
token = fv.full_config.set(full_conf)
|
||||
try:
|
||||
with caplog.at_level(logging.WARNING):
|
||||
_sntp_final_validate({})
|
||||
|
||||
updated_conf = fv.full_config.get()
|
||||
|
||||
# Should have 2 time platforms: 1 merged SNTP + 1 homeassistant
|
||||
assert len(updated_conf[CONF_TIME]) == 2
|
||||
|
||||
# Find the platforms
|
||||
platforms = {tc[CONF_PLATFORM] for tc in updated_conf[CONF_TIME]}
|
||||
assert platforms == {CONF_SNTP, "homeassistant"}
|
||||
|
||||
# Verify SNTP was merged
|
||||
sntp_instances = [
|
||||
tc for tc in updated_conf[CONF_TIME] if tc[CONF_PLATFORM] == CONF_SNTP
|
||||
]
|
||||
assert len(sntp_instances) == 1
|
||||
assert sntp_instances[0][CONF_SERVERS] == ["192.168.1.1", "192.168.1.2"]
|
||||
finally:
|
||||
fv.full_config.reset(token)
|
||||
Reference in New Issue
Block a user