mirror of
https://github.com/esphome/esphome.git
synced 2025-11-16 06:45:48 +00:00
Compare commits
15 Commits
sntp_singl
...
de_dupe_la
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f6378990cd | ||
|
|
e9ff4d3c4e | ||
|
|
4081345013 | ||
|
|
5989b78e93 | ||
|
|
5727043cec | ||
|
|
1441c7fab2 | ||
|
|
62248b6bba | ||
|
|
b7c105125e | ||
|
|
11de948698 | ||
|
|
6ade327cde | ||
|
|
cc1b547ad2 | ||
|
|
1df996601d | ||
|
|
c32891ec02 | ||
|
|
2bf6d48fcf | ||
|
|
e49a943cf7 |
4
.github/workflows/codeql.yml
vendored
4
.github/workflows/codeql.yml
vendored
@@ -58,7 +58,7 @@ jobs:
|
||||
|
||||
# Initializes the CodeQL tools for scanning.
|
||||
- name: Initialize CodeQL
|
||||
uses: github/codeql-action/init@0499de31b99561a6d14a36a5f662c2a54f91beee # v4.31.2
|
||||
uses: github/codeql-action/init@014f16e7ab1402f30e7c3329d33797e7948572db # v4.31.3
|
||||
with:
|
||||
languages: ${{ matrix.language }}
|
||||
build-mode: ${{ matrix.build-mode }}
|
||||
@@ -86,6 +86,6 @@ jobs:
|
||||
exit 1
|
||||
|
||||
- name: Perform CodeQL Analysis
|
||||
uses: github/codeql-action/analyze@0499de31b99561a6d14a36a5f662c2a54f91beee # v4.31.2
|
||||
uses: github/codeql-action/analyze@014f16e7ab1402f30e7c3329d33797e7948572db # v4.31.3
|
||||
with:
|
||||
category: "/language:${{matrix.language}}"
|
||||
|
||||
@@ -11,7 +11,7 @@ ci:
|
||||
repos:
|
||||
- repo: https://github.com/astral-sh/ruff-pre-commit
|
||||
# Ruff version.
|
||||
rev: v0.14.4
|
||||
rev: v0.14.5
|
||||
hooks:
|
||||
# Run the linter.
|
||||
- id: ruff
|
||||
|
||||
@@ -1,14 +1,9 @@
|
||||
import logging
|
||||
|
||||
import esphome.codegen as cg
|
||||
from esphome.components import time as time_
|
||||
from esphome.config_helpers import merge_config
|
||||
import esphome.config_validation as cv
|
||||
from esphome.const import (
|
||||
CONF_ID,
|
||||
CONF_PLATFORM,
|
||||
CONF_SERVERS,
|
||||
CONF_TIME,
|
||||
PLATFORM_BK72XX,
|
||||
PLATFORM_ESP32,
|
||||
PLATFORM_ESP8266,
|
||||
@@ -17,74 +12,13 @@ from esphome.const import (
|
||||
PLATFORM_RTL87XX,
|
||||
)
|
||||
from esphome.core import CORE
|
||||
import esphome.final_validate as fv
|
||||
from esphome.types import ConfigType
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
DEPENDENCIES = ["network"]
|
||||
|
||||
CONF_SNTP = "sntp"
|
||||
|
||||
sntp_ns = cg.esphome_ns.namespace("sntp")
|
||||
SNTPComponent = sntp_ns.class_("SNTPComponent", time_.RealTimeClock)
|
||||
|
||||
DEFAULT_SERVERS = ["0.pool.ntp.org", "1.pool.ntp.org", "2.pool.ntp.org"]
|
||||
|
||||
|
||||
def _sntp_final_validate(config: ConfigType) -> None:
|
||||
"""Merge multiple SNTP instances into one, similar to OTA merging behavior."""
|
||||
full_conf = fv.full_config.get()
|
||||
time_confs = full_conf.get(CONF_TIME, [])
|
||||
|
||||
sntp_configs: list[ConfigType] = []
|
||||
other_time_configs: list[ConfigType] = []
|
||||
|
||||
for time_conf in time_confs:
|
||||
if time_conf.get(CONF_PLATFORM) == CONF_SNTP:
|
||||
sntp_configs.append(time_conf)
|
||||
else:
|
||||
other_time_configs.append(time_conf)
|
||||
|
||||
if len(sntp_configs) <= 1:
|
||||
return
|
||||
|
||||
# Merge all SNTP configs into the first one
|
||||
merged = sntp_configs[0]
|
||||
for sntp_conf in sntp_configs[1:]:
|
||||
# Validate that IDs are consistent if manually specified
|
||||
if merged[CONF_ID].is_manual and sntp_conf[CONF_ID].is_manual:
|
||||
raise cv.Invalid(
|
||||
f"Found multiple SNTP configurations but {CONF_ID} is inconsistent"
|
||||
)
|
||||
merged = merge_config(merged, sntp_conf)
|
||||
|
||||
# Deduplicate servers while preserving order
|
||||
servers = merged[CONF_SERVERS]
|
||||
unique_servers = list(dict.fromkeys(servers))
|
||||
|
||||
# Warn if we're dropping servers due to 3-server limit
|
||||
if len(unique_servers) > 3:
|
||||
dropped = unique_servers[3:]
|
||||
unique_servers = unique_servers[:3]
|
||||
_LOGGER.warning(
|
||||
"SNTP supports maximum 3 servers. Dropped excess server(s): %s",
|
||||
dropped,
|
||||
)
|
||||
|
||||
merged[CONF_SERVERS] = unique_servers
|
||||
|
||||
_LOGGER.warning(
|
||||
"Found and merged %d SNTP time configurations into one instance",
|
||||
len(sntp_configs),
|
||||
)
|
||||
|
||||
# Replace time configs with merged SNTP + other time platforms
|
||||
other_time_configs.append(merged)
|
||||
full_conf[CONF_TIME] = other_time_configs
|
||||
fv.full_config.set(full_conf)
|
||||
|
||||
|
||||
CONFIG_SCHEMA = cv.All(
|
||||
time_.TIME_SCHEMA.extend(
|
||||
{
|
||||
@@ -106,8 +40,6 @@ CONFIG_SCHEMA = cv.All(
|
||||
),
|
||||
)
|
||||
|
||||
FINAL_VALIDATE_SCHEMA = _sntp_final_validate
|
||||
|
||||
|
||||
async def to_code(config):
|
||||
servers = config[CONF_SERVERS]
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from logging import getLogger
|
||||
import math
|
||||
import re
|
||||
|
||||
@@ -35,6 +36,8 @@ from esphome.core import CORE, ID
|
||||
import esphome.final_validate as fv
|
||||
from esphome.yaml_util import make_data_base
|
||||
|
||||
_LOGGER = getLogger(__name__)
|
||||
|
||||
CODEOWNERS = ["@esphome/core"]
|
||||
uart_ns = cg.esphome_ns.namespace("uart")
|
||||
UARTComponent = uart_ns.class_("UARTComponent")
|
||||
@@ -130,6 +133,21 @@ def validate_host_config(config):
|
||||
return config
|
||||
|
||||
|
||||
def validate_rx_buffer_size(config):
|
||||
if CORE.is_esp32:
|
||||
# ESP32 UART hardware FIFO is 128 bytes (LP UART is 16 bytes, but we use 128 as safe minimum)
|
||||
# rx_buffer_size must be greater than the hardware FIFO length
|
||||
min_buffer_size = 128
|
||||
if config[CONF_RX_BUFFER_SIZE] <= min_buffer_size:
|
||||
_LOGGER.warning(
|
||||
"UART rx_buffer_size (%d bytes) is too small and must be greater than the hardware "
|
||||
"FIFO size (%d bytes). The buffer size will be automatically adjusted at runtime.",
|
||||
config[CONF_RX_BUFFER_SIZE],
|
||||
min_buffer_size,
|
||||
)
|
||||
return config
|
||||
|
||||
|
||||
def _uart_declare_type(value):
|
||||
if CORE.is_esp8266:
|
||||
return cv.declare_id(ESP8266UartComponent)(value)
|
||||
@@ -247,6 +265,7 @@ CONFIG_SCHEMA = cv.All(
|
||||
).extend(cv.COMPONENT_SCHEMA),
|
||||
cv.has_at_least_one_key(CONF_TX_PIN, CONF_RX_PIN, CONF_PORT),
|
||||
validate_host_config,
|
||||
validate_rx_buffer_size,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -91,6 +91,16 @@ void IDFUARTComponent::setup() {
|
||||
this->uart_num_ = static_cast<uart_port_t>(next_uart_num++);
|
||||
this->lock_ = xSemaphoreCreateMutex();
|
||||
|
||||
#if (SOC_UART_LP_NUM >= 1)
|
||||
size_t fifo_len = ((this->uart_num_ < SOC_UART_HP_NUM) ? SOC_UART_FIFO_LEN : SOC_LP_UART_FIFO_LEN);
|
||||
#else
|
||||
size_t fifo_len = SOC_UART_FIFO_LEN;
|
||||
#endif
|
||||
if (this->rx_buffer_size_ <= fifo_len) {
|
||||
ESP_LOGW(TAG, "rx_buffer_size is too small, must be greater than %zu", fifo_len);
|
||||
this->rx_buffer_size_ = fifo_len * 2;
|
||||
}
|
||||
|
||||
xSemaphoreTake(this->lock_, portMAX_DELAY);
|
||||
|
||||
this->load_settings(false);
|
||||
@@ -237,8 +247,12 @@ void IDFUARTComponent::set_rx_timeout(size_t rx_timeout) {
|
||||
|
||||
void IDFUARTComponent::write_array(const uint8_t *data, size_t len) {
|
||||
xSemaphoreTake(this->lock_, portMAX_DELAY);
|
||||
uart_write_bytes(this->uart_num_, data, len);
|
||||
int32_t write_len = uart_write_bytes(this->uart_num_, data, len);
|
||||
xSemaphoreGive(this->lock_);
|
||||
if (write_len != (int32_t) len) {
|
||||
ESP_LOGW(TAG, "uart_write_bytes failed: %d != %zu", write_len, len);
|
||||
this->mark_failed();
|
||||
}
|
||||
#ifdef USE_UART_DEBUGGER
|
||||
for (size_t i = 0; i < len; i++) {
|
||||
this->debug_callback_.call(UART_DIRECTION_TX, data[i]);
|
||||
@@ -267,6 +281,7 @@ bool IDFUARTComponent::peek_byte(uint8_t *data) {
|
||||
|
||||
bool IDFUARTComponent::read_array(uint8_t *data, size_t len) {
|
||||
size_t length_to_read = len;
|
||||
int32_t read_len = 0;
|
||||
if (!this->check_read_timeout_(len))
|
||||
return false;
|
||||
xSemaphoreTake(this->lock_, portMAX_DELAY);
|
||||
@@ -277,25 +292,31 @@ bool IDFUARTComponent::read_array(uint8_t *data, size_t len) {
|
||||
this->has_peek_ = false;
|
||||
}
|
||||
if (length_to_read > 0)
|
||||
uart_read_bytes(this->uart_num_, data, length_to_read, 20 / portTICK_PERIOD_MS);
|
||||
read_len = uart_read_bytes(this->uart_num_, data, length_to_read, 20 / portTICK_PERIOD_MS);
|
||||
xSemaphoreGive(this->lock_);
|
||||
#ifdef USE_UART_DEBUGGER
|
||||
for (size_t i = 0; i < len; i++) {
|
||||
this->debug_callback_.call(UART_DIRECTION_RX, data[i]);
|
||||
}
|
||||
#endif
|
||||
return true;
|
||||
return read_len == (int32_t) length_to_read;
|
||||
}
|
||||
|
||||
int IDFUARTComponent::available() {
|
||||
size_t available;
|
||||
size_t available = 0;
|
||||
esp_err_t err;
|
||||
|
||||
xSemaphoreTake(this->lock_, portMAX_DELAY);
|
||||
uart_get_buffered_data_len(this->uart_num_, &available);
|
||||
if (this->has_peek_)
|
||||
available++;
|
||||
err = uart_get_buffered_data_len(this->uart_num_, &available);
|
||||
xSemaphoreGive(this->lock_);
|
||||
|
||||
if (err != ESP_OK) {
|
||||
ESP_LOGW(TAG, "uart_get_buffered_data_len failed: %s", esp_err_to_name(err));
|
||||
this->mark_failed();
|
||||
}
|
||||
if (this->has_peek_) {
|
||||
available++;
|
||||
}
|
||||
return available;
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,6 @@ from esphome.components.network import (
|
||||
from esphome.components.psram import is_guaranteed as psram_is_guaranteed
|
||||
from esphome.config_helpers import filter_source_files_from_platform
|
||||
import esphome.config_validation as cv
|
||||
from esphome.config_validation import only_with_esp_idf
|
||||
from esphome.const import (
|
||||
CONF_AP,
|
||||
CONF_BSSID,
|
||||
@@ -352,7 +351,7 @@ CONFIG_SCHEMA = cv.All(
|
||||
single=True
|
||||
),
|
||||
cv.Optional(CONF_USE_PSRAM): cv.All(
|
||||
only_with_esp_idf, cv.requires_component("psram"), cv.boolean
|
||||
cv.only_on_esp32, cv.requires_component("psram"), cv.boolean
|
||||
),
|
||||
}
|
||||
),
|
||||
|
||||
@@ -19,11 +19,21 @@ from esphome.core import (
|
||||
TimePeriodNanoseconds,
|
||||
TimePeriodSeconds,
|
||||
)
|
||||
from esphome.coroutine import CoroPriority, coroutine_with_priority
|
||||
from esphome.helpers import cpp_string_escape, indent_all_but_first_and_last
|
||||
from esphome.types import Expression, SafeExpType, TemplateArgsType
|
||||
from esphome.util import OrderedDict
|
||||
from esphome.yaml_util import ESPHomeDataBase
|
||||
|
||||
# Keys for lambda deduplication storage in CORE.data
|
||||
_KEY_LAMBDA_DEDUP = "lambda_dedup"
|
||||
_KEY_LAMBDA_DEDUP_DECLARATIONS = "lambda_dedup_declarations"
|
||||
|
||||
# Regex patterns for static variable detection (compiled once)
|
||||
_RE_CPP_SINGLE_LINE_COMMENT = re.compile(r"//.*?$", re.MULTILINE)
|
||||
_RE_CPP_MULTI_LINE_COMMENT = re.compile(r"/\*.*?\*/", re.DOTALL)
|
||||
_RE_STATIC_VARIABLE = re.compile(r"\bstatic\s+(?!cast|assert|pointer_cast)\w+\s+\w+")
|
||||
|
||||
|
||||
class RawExpression(Expression):
|
||||
__slots__ = ("text",)
|
||||
@@ -188,7 +198,7 @@ class LambdaExpression(Expression):
|
||||
|
||||
def __init__(
|
||||
self, parts, parameters, capture: str = "=", return_type=None, source=None
|
||||
):
|
||||
) -> None:
|
||||
self.parts = parts
|
||||
if not isinstance(parameters, ParameterListExpression):
|
||||
parameters = ParameterListExpression(*parameters)
|
||||
@@ -197,16 +207,21 @@ class LambdaExpression(Expression):
|
||||
self.capture = capture
|
||||
self.return_type = safe_exp(return_type) if return_type is not None else None
|
||||
|
||||
def __str__(self):
|
||||
def format_body(self) -> str:
|
||||
"""Format the lambda body with source directive and content."""
|
||||
body = ""
|
||||
if self.source is not None:
|
||||
body += f"{self.source.as_line_directive}\n"
|
||||
body += self.content
|
||||
return body
|
||||
|
||||
def __str__(self) -> str:
|
||||
# Stateless lambdas (empty capture) implicitly convert to function pointers
|
||||
# when assigned to function pointer types - no unary + needed
|
||||
cpp = f"[{self.capture}]({self.parameters})"
|
||||
if self.return_type is not None:
|
||||
cpp += f" -> {self.return_type}"
|
||||
cpp += " {\n"
|
||||
if self.source is not None:
|
||||
cpp += f"{self.source.as_line_directive}\n"
|
||||
cpp += f"{self.content}\n}}"
|
||||
cpp += f" {{\n{self.format_body()}\n}}"
|
||||
return indent_all_but_first_and_last(cpp)
|
||||
|
||||
@property
|
||||
@@ -214,6 +229,37 @@ class LambdaExpression(Expression):
|
||||
return "".join(str(part) for part in self.parts)
|
||||
|
||||
|
||||
class SharedFunctionLambdaExpression(LambdaExpression):
|
||||
"""A lambda expression that references a shared deduplicated function.
|
||||
|
||||
This class wraps a function pointer but maintains the LambdaExpression
|
||||
interface so calling code works unchanged.
|
||||
"""
|
||||
|
||||
__slots__ = ("_func_name",)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
func_name: str,
|
||||
parameters: TemplateArgsType,
|
||||
return_type: SafeExpType | None = None,
|
||||
) -> None:
|
||||
# Initialize parent with empty parts since we're just a function reference
|
||||
super().__init__(
|
||||
[], parameters, capture="", return_type=return_type, source=None
|
||||
)
|
||||
self._func_name = func_name
|
||||
|
||||
def __str__(self) -> str:
|
||||
# Just return the function name - it's already a function pointer
|
||||
return self._func_name
|
||||
|
||||
@property
|
||||
def content(self) -> str:
|
||||
# No content, just a function reference
|
||||
return ""
|
||||
|
||||
|
||||
# pylint: disable=abstract-method
|
||||
class Literal(Expression, metaclass=abc.ABCMeta):
|
||||
__slots__ = ()
|
||||
@@ -583,6 +629,25 @@ def add_global(expression: SafeExpType | Statement, prepend: bool = False):
|
||||
CORE.add_global(expression, prepend)
|
||||
|
||||
|
||||
@coroutine_with_priority(CoroPriority.FINAL)
|
||||
async def flush_lambda_dedup_declarations() -> None:
|
||||
"""Flush all deferred lambda deduplication declarations to global scope.
|
||||
|
||||
This is a coroutine that runs with FINAL priority (after all components)
|
||||
to ensure all referenced variables are declared before the shared
|
||||
lambda functions that use them.
|
||||
"""
|
||||
if _KEY_LAMBDA_DEDUP_DECLARATIONS not in CORE.data:
|
||||
return
|
||||
|
||||
declarations = CORE.data[_KEY_LAMBDA_DEDUP_DECLARATIONS]
|
||||
for func_declaration in declarations:
|
||||
add_global(RawStatement(func_declaration))
|
||||
|
||||
# Clear the list so we don't add them again
|
||||
CORE.data[_KEY_LAMBDA_DEDUP_DECLARATIONS] = []
|
||||
|
||||
|
||||
def add_library(name: str, version: str | None, repository: str | None = None):
|
||||
"""Add a library to the codegen library storage.
|
||||
|
||||
@@ -656,6 +721,93 @@ async def get_variable_with_full_id(id_: ID) -> tuple[ID, "MockObj"]:
|
||||
return await CORE.get_variable_with_full_id(id_)
|
||||
|
||||
|
||||
def _has_static_variables(code: str) -> bool:
|
||||
"""Check if code contains static variable definitions.
|
||||
|
||||
Static variables in lambdas should not be deduplicated because each lambda
|
||||
instance should have its own static variable state.
|
||||
|
||||
Args:
|
||||
code: The lambda body code to check
|
||||
|
||||
Returns:
|
||||
True if code contains static variable definitions
|
||||
"""
|
||||
# Remove C++ comments to avoid false positives
|
||||
# Remove single-line comments (// ...)
|
||||
code_no_comments = _RE_CPP_SINGLE_LINE_COMMENT.sub("", code)
|
||||
# Remove multi-line comments (/* ... */)
|
||||
code_no_comments = _RE_CPP_MULTI_LINE_COMMENT.sub("", code_no_comments)
|
||||
|
||||
# Match: static <type> <identifier>
|
||||
# But not: static_cast, static_assert, static_pointer_cast
|
||||
return bool(_RE_STATIC_VARIABLE.search(code_no_comments))
|
||||
|
||||
|
||||
def _get_shared_lambda_name(lambda_expr: LambdaExpression) -> str | None:
|
||||
"""Get the shared function name for a lambda expression.
|
||||
|
||||
If an identical lambda was already generated, returns the existing shared
|
||||
function name. Otherwise, creates a new shared function and returns its name.
|
||||
|
||||
Lambdas with static variables are not deduplicated to preserve their
|
||||
independent state.
|
||||
|
||||
Args:
|
||||
lambda_expr: The lambda expression to deduplicate
|
||||
|
||||
Returns:
|
||||
The name of the shared function for this lambda (either existing or newly created),
|
||||
or None if the lambda should not be deduplicated (e.g., contains static variables)
|
||||
"""
|
||||
# Create a unique key from the lambda content, parameters, and return type
|
||||
content = lambda_expr.content
|
||||
|
||||
# Don't deduplicate lambdas with static variables - each instance needs its own state
|
||||
if _has_static_variables(content):
|
||||
return None
|
||||
param_str = str(lambda_expr.parameters)
|
||||
return_str = (
|
||||
str(lambda_expr.return_type) if lambda_expr.return_type is not None else "void"
|
||||
)
|
||||
|
||||
# Use tuple of (content, params, return_type) as key
|
||||
lambda_key = (content, param_str, return_str)
|
||||
|
||||
# Initialize deduplication storage in CORE.data if not exists
|
||||
if _KEY_LAMBDA_DEDUP not in CORE.data:
|
||||
CORE.data[_KEY_LAMBDA_DEDUP] = {}
|
||||
# Register the flush job to run after all components (FINAL priority)
|
||||
# This ensures all variables are declared before shared lambda functions
|
||||
CORE.add_job(flush_lambda_dedup_declarations)
|
||||
|
||||
lambda_cache = CORE.data[_KEY_LAMBDA_DEDUP]
|
||||
|
||||
# Check if we've seen this lambda before
|
||||
if lambda_key in lambda_cache:
|
||||
# Return name of existing shared function
|
||||
return lambda_cache[lambda_key]
|
||||
|
||||
# First occurrence - create a shared function
|
||||
# Use the cache size as the function number
|
||||
func_name = f"shared_lambda_{len(lambda_cache)}"
|
||||
|
||||
# Build the function declaration using lambda's body formatting
|
||||
func_declaration = (
|
||||
f"{return_str} {func_name}({param_str}) {{\n{lambda_expr.format_body()}\n}}"
|
||||
)
|
||||
|
||||
# Store the declaration to be added later (after all variable declarations)
|
||||
# We can't add it immediately because it might reference variables not yet declared
|
||||
CORE.data.setdefault(_KEY_LAMBDA_DEDUP_DECLARATIONS, []).append(func_declaration)
|
||||
|
||||
# Store in cache
|
||||
lambda_cache[lambda_key] = func_name
|
||||
|
||||
# Return the function name (this is the first occurrence, but we still generate shared function)
|
||||
return func_name
|
||||
|
||||
|
||||
async def process_lambda(
|
||||
value: Lambda,
|
||||
parameters: TemplateArgsType,
|
||||
@@ -713,6 +865,19 @@ async def process_lambda(
|
||||
location.line += value.content_offset
|
||||
else:
|
||||
location = None
|
||||
|
||||
# Lambda deduplication: Only deduplicate stateless lambdas (empty capture).
|
||||
# Stateful lambdas cannot be shared as they capture different contexts.
|
||||
# Lambdas with static variables are also not deduplicated to preserve independent state.
|
||||
if capture == "":
|
||||
lambda_expr = LambdaExpression(
|
||||
parts, parameters, capture, return_type, location
|
||||
)
|
||||
func_name = _get_shared_lambda_name(lambda_expr)
|
||||
if func_name is not None:
|
||||
# Return a shared function reference instead of inline lambda
|
||||
return SharedFunctionLambdaExpression(func_name, parameters, return_type)
|
||||
|
||||
return LambdaExpression(parts, parameters, capture, return_type, location)
|
||||
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
pylint==4.0.3
|
||||
flake8==7.3.0 # also change in .pre-commit-config.yaml when updating
|
||||
ruff==0.14.4 # also change in .pre-commit-config.yaml when updating
|
||||
ruff==0.14.5 # also change in .pre-commit-config.yaml when updating
|
||||
pyupgrade==3.21.1 # also change in .pre-commit-config.yaml when updating
|
||||
pre-commit
|
||||
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
"""Tests for SNTP component."""
|
||||
@@ -1,22 +0,0 @@
|
||||
esphome:
|
||||
name: sntp-test
|
||||
|
||||
esp32:
|
||||
board: esp32dev
|
||||
framework:
|
||||
type: esp-idf
|
||||
|
||||
wifi:
|
||||
ssid: "testssid"
|
||||
password: "testpassword"
|
||||
|
||||
# Test multiple SNTP instances that should be merged
|
||||
time:
|
||||
- platform: sntp
|
||||
servers:
|
||||
- 192.168.1.1
|
||||
- pool.ntp.org
|
||||
- platform: sntp
|
||||
servers:
|
||||
- pool.ntp.org
|
||||
- 192.168.1.2
|
||||
@@ -1,238 +0,0 @@
|
||||
"""Tests for SNTP time configuration validation."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from esphome import config_validation as cv
|
||||
from esphome.components.sntp.time import CONF_SNTP, _sntp_final_validate
|
||||
from esphome.const import CONF_ID, CONF_PLATFORM, CONF_SERVERS, CONF_TIME
|
||||
from esphome.core import ID
|
||||
import esphome.final_validate as fv
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("time_configs", "expected_count", "expected_servers", "warning_messages"),
|
||||
[
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.1", "pool.ntp.org"],
|
||||
}
|
||||
],
|
||||
1,
|
||||
["192.168.1.1", "pool.ntp.org"],
|
||||
[],
|
||||
id="single_instance_no_merge",
|
||||
),
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_1", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.1", "pool.ntp.org"],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_2", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.2"],
|
||||
},
|
||||
],
|
||||
1,
|
||||
["192.168.1.1", "pool.ntp.org", "192.168.1.2"],
|
||||
["Found and merged 2 SNTP time configurations into one instance"],
|
||||
id="two_instances_merged",
|
||||
),
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_1", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.1", "pool.ntp.org"],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_2", is_manual=False),
|
||||
CONF_SERVERS: ["pool.ntp.org", "192.168.1.2"],
|
||||
},
|
||||
],
|
||||
1,
|
||||
["192.168.1.1", "pool.ntp.org", "192.168.1.2"],
|
||||
["Found and merged 2 SNTP time configurations into one instance"],
|
||||
id="deduplication_preserves_order",
|
||||
),
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_1", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.1", "pool.ntp.org"],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_2", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.2", "pool2.ntp.org"],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_3", is_manual=False),
|
||||
CONF_SERVERS: ["pool3.ntp.org"],
|
||||
},
|
||||
],
|
||||
1,
|
||||
["192.168.1.1", "pool.ntp.org", "192.168.1.2"],
|
||||
[
|
||||
"SNTP supports maximum 3 servers. Dropped excess server(s): ['pool2.ntp.org', 'pool3.ntp.org']",
|
||||
"Found and merged 3 SNTP time configurations into one instance",
|
||||
],
|
||||
id="three_instances_drops_excess_servers",
|
||||
),
|
||||
pytest.param(
|
||||
[
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_1", is_manual=False),
|
||||
CONF_SERVERS: [
|
||||
"192.168.1.1",
|
||||
"pool.ntp.org",
|
||||
"pool.ntp.org",
|
||||
"192.168.1.1",
|
||||
],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_2", is_manual=False),
|
||||
CONF_SERVERS: ["pool.ntp.org", "192.168.1.2"],
|
||||
},
|
||||
],
|
||||
1,
|
||||
["192.168.1.1", "pool.ntp.org", "192.168.1.2"],
|
||||
["Found and merged 2 SNTP time configurations into one instance"],
|
||||
id="deduplication_multiple_duplicates",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_sntp_instance_merging(
|
||||
time_configs: list[dict[str, Any]],
|
||||
expected_count: int,
|
||||
expected_servers: list[str],
|
||||
warning_messages: list[str],
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test SNTP instance merging behavior."""
|
||||
# Create a mock full config with time configs
|
||||
full_conf = {CONF_TIME: time_configs.copy()}
|
||||
|
||||
# Set the context var
|
||||
token = fv.full_config.set(full_conf)
|
||||
try:
|
||||
with caplog.at_level(logging.WARNING):
|
||||
_sntp_final_validate({})
|
||||
|
||||
# Get the updated config
|
||||
updated_conf = fv.full_config.get()
|
||||
|
||||
# Check if merging occurred
|
||||
if len(time_configs) > 1:
|
||||
# Verify only one SNTP instance remains
|
||||
sntp_instances = [
|
||||
tc
|
||||
for tc in updated_conf[CONF_TIME]
|
||||
if tc.get(CONF_PLATFORM) == CONF_SNTP
|
||||
]
|
||||
assert len(sntp_instances) == expected_count
|
||||
|
||||
# Verify server list
|
||||
assert sntp_instances[0][CONF_SERVERS] == expected_servers
|
||||
|
||||
# Verify warnings
|
||||
for expected_msg in warning_messages:
|
||||
assert any(
|
||||
expected_msg in record.message for record in caplog.records
|
||||
), f"Expected warning message '{expected_msg}' not found in log"
|
||||
else:
|
||||
# Single instance should not trigger merging or warnings
|
||||
assert len(caplog.records) == 0
|
||||
# Config should be unchanged
|
||||
assert updated_conf[CONF_TIME] == time_configs
|
||||
finally:
|
||||
fv.full_config.reset(token)
|
||||
|
||||
|
||||
def test_sntp_inconsistent_manual_ids() -> None:
|
||||
"""Test that inconsistent manual IDs raise an error."""
|
||||
# Create configs with manual IDs that are inconsistent
|
||||
time_configs = [
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_1", is_manual=True),
|
||||
CONF_SERVERS: ["192.168.1.1"],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_2", is_manual=True),
|
||||
CONF_SERVERS: ["192.168.1.2"],
|
||||
},
|
||||
]
|
||||
|
||||
full_conf = {CONF_TIME: time_configs}
|
||||
|
||||
token = fv.full_config.set(full_conf)
|
||||
try:
|
||||
with pytest.raises(
|
||||
cv.Invalid,
|
||||
match="Found multiple SNTP configurations but id is inconsistent",
|
||||
):
|
||||
_sntp_final_validate({})
|
||||
finally:
|
||||
fv.full_config.reset(token)
|
||||
|
||||
|
||||
def test_sntp_with_other_time_platforms(caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""Test that SNTP merging doesn't affect other time platforms."""
|
||||
time_configs = [
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_1", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.1"],
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: "homeassistant",
|
||||
CONF_ID: ID("homeassistant_time", is_manual=False),
|
||||
},
|
||||
{
|
||||
CONF_PLATFORM: CONF_SNTP,
|
||||
CONF_ID: ID("sntp_time_2", is_manual=False),
|
||||
CONF_SERVERS: ["192.168.1.2"],
|
||||
},
|
||||
]
|
||||
|
||||
full_conf = {CONF_TIME: time_configs.copy()}
|
||||
|
||||
token = fv.full_config.set(full_conf)
|
||||
try:
|
||||
with caplog.at_level(logging.WARNING):
|
||||
_sntp_final_validate({})
|
||||
|
||||
updated_conf = fv.full_config.get()
|
||||
|
||||
# Should have 2 time platforms: 1 merged SNTP + 1 homeassistant
|
||||
assert len(updated_conf[CONF_TIME]) == 2
|
||||
|
||||
# Find the platforms
|
||||
platforms = {tc[CONF_PLATFORM] for tc in updated_conf[CONF_TIME]}
|
||||
assert platforms == {CONF_SNTP, "homeassistant"}
|
||||
|
||||
# Verify SNTP was merged
|
||||
sntp_instances = [
|
||||
tc for tc in updated_conf[CONF_TIME] if tc[CONF_PLATFORM] == CONF_SNTP
|
||||
]
|
||||
assert len(sntp_instances) == 1
|
||||
assert sntp_instances[0][CONF_SERVERS] == ["192.168.1.1", "192.168.1.2"]
|
||||
finally:
|
||||
fv.full_config.reset(token)
|
||||
@@ -1,4 +1,6 @@
|
||||
"""Tests for the binary sensor component."""
|
||||
"""Tests for the text component."""
|
||||
|
||||
from esphome.core import CORE
|
||||
|
||||
|
||||
def test_text_is_setup(generate_main):
|
||||
@@ -56,15 +58,22 @@ def test_text_config_value_mode_set(generate_main):
|
||||
assert "it_3->traits.set_mode(text::TEXT_MODE_PASSWORD);" in main_cpp
|
||||
|
||||
|
||||
def test_text_config_lamda_is_set(generate_main):
|
||||
def test_text_config_lambda_is_set(generate_main) -> None:
|
||||
"""
|
||||
Test if lambda is set for lambda mode (optimized with stateless lambda)
|
||||
Test if lambda is set for lambda mode (optimized with stateless lambda and deduplication)
|
||||
"""
|
||||
# Given
|
||||
|
||||
# When
|
||||
main_cpp = generate_main("tests/component_tests/text/test_text.yaml")
|
||||
|
||||
# Get both global and main sections to find the shared lambda definition
|
||||
full_cpp = CORE.cpp_global_section + main_cpp
|
||||
|
||||
# Then
|
||||
assert "it_4->set_template([]() -> esphome::optional<std::string> {" in main_cpp
|
||||
assert 'return std::string{"Hello"};' in main_cpp
|
||||
# Lambda is deduplicated into a shared function (reference in main section)
|
||||
assert "it_4->set_template(shared_lambda_" in main_cpp
|
||||
# Lambda body should be in the code somewhere
|
||||
assert 'return std::string{"Hello"};' in full_cpp
|
||||
# Verify the shared lambda function is defined (in global section)
|
||||
assert "esphome::optional<std::string> shared_lambda_" in full_cpp
|
||||
|
||||
279
tests/unit_tests/test_lambda_dedup.py
Normal file
279
tests/unit_tests/test_lambda_dedup.py
Normal file
@@ -0,0 +1,279 @@
|
||||
"""Tests for lambda deduplication in cpp_generator."""
|
||||
|
||||
from esphome import cpp_generator as cg
|
||||
from esphome.core import CORE
|
||||
|
||||
|
||||
def test_deduplicate_identical_lambdas() -> None:
|
||||
"""Test that identical stateless lambdas are deduplicated."""
|
||||
# Create two identical lambda expressions
|
||||
lambda1 = cg.LambdaExpression(
|
||||
parts=["return 42;"],
|
||||
parameters=[],
|
||||
capture="",
|
||||
return_type=cg.RawExpression("int"),
|
||||
)
|
||||
|
||||
lambda2 = cg.LambdaExpression(
|
||||
parts=["return 42;"],
|
||||
parameters=[],
|
||||
capture="",
|
||||
return_type=cg.RawExpression("int"),
|
||||
)
|
||||
|
||||
# Try to deduplicate them
|
||||
func_name1 = cg._get_shared_lambda_name(lambda1)
|
||||
func_name2 = cg._get_shared_lambda_name(lambda2)
|
||||
|
||||
# Both should get the same function name (deduplication happened)
|
||||
assert func_name1 == func_name2
|
||||
assert func_name1 == "shared_lambda_0"
|
||||
|
||||
|
||||
def test_different_lambdas_not_deduplicated() -> None:
|
||||
"""Test that different lambdas get different function names."""
|
||||
lambda1 = cg.LambdaExpression(
|
||||
parts=["return 42;"],
|
||||
parameters=[],
|
||||
capture="",
|
||||
return_type=cg.RawExpression("int"),
|
||||
)
|
||||
|
||||
lambda2 = cg.LambdaExpression(
|
||||
parts=["return 24;"], # Different content
|
||||
parameters=[],
|
||||
capture="",
|
||||
return_type=cg.RawExpression("int"),
|
||||
)
|
||||
|
||||
func_name1 = cg._get_shared_lambda_name(lambda1)
|
||||
func_name2 = cg._get_shared_lambda_name(lambda2)
|
||||
|
||||
# Different lambdas should get different function names
|
||||
assert func_name1 != func_name2
|
||||
assert func_name1 == "shared_lambda_0"
|
||||
assert func_name2 == "shared_lambda_1"
|
||||
|
||||
|
||||
def test_different_return_types_not_deduplicated() -> None:
|
||||
"""Test that lambdas with different return types are not deduplicated."""
|
||||
lambda1 = cg.LambdaExpression(
|
||||
parts=["return 42;"],
|
||||
parameters=[],
|
||||
capture="",
|
||||
return_type=cg.RawExpression("int"),
|
||||
)
|
||||
|
||||
lambda2 = cg.LambdaExpression(
|
||||
parts=["return 42;"], # Same content
|
||||
parameters=[],
|
||||
capture="",
|
||||
return_type=cg.RawExpression("float"), # Different return type
|
||||
)
|
||||
|
||||
func_name1 = cg._get_shared_lambda_name(lambda1)
|
||||
func_name2 = cg._get_shared_lambda_name(lambda2)
|
||||
|
||||
# Different return types = different functions
|
||||
assert func_name1 != func_name2
|
||||
|
||||
|
||||
def test_different_parameters_not_deduplicated() -> None:
|
||||
"""Test that lambdas with different parameters are not deduplicated."""
|
||||
lambda1 = cg.LambdaExpression(
|
||||
parts=["return x;"],
|
||||
parameters=[("int", "x")],
|
||||
capture="",
|
||||
return_type=cg.RawExpression("int"),
|
||||
)
|
||||
|
||||
lambda2 = cg.LambdaExpression(
|
||||
parts=["return x;"], # Same content
|
||||
parameters=[("float", "x")], # Different parameter type
|
||||
capture="",
|
||||
return_type=cg.RawExpression("int"),
|
||||
)
|
||||
|
||||
func_name1 = cg._get_shared_lambda_name(lambda1)
|
||||
func_name2 = cg._get_shared_lambda_name(lambda2)
|
||||
|
||||
# Different parameters = different functions
|
||||
assert func_name1 != func_name2
|
||||
|
||||
|
||||
def test_flush_lambda_dedup_declarations() -> None:
|
||||
"""Test that deferred declarations are properly stored for later flushing."""
|
||||
# Create a lambda which will create a deferred declaration
|
||||
lambda1 = cg.LambdaExpression(
|
||||
parts=["return 42;"],
|
||||
parameters=[],
|
||||
capture="",
|
||||
return_type=cg.RawExpression("int"),
|
||||
)
|
||||
|
||||
cg._get_shared_lambda_name(lambda1)
|
||||
|
||||
# Check that declaration was stored
|
||||
assert cg._KEY_LAMBDA_DEDUP_DECLARATIONS in CORE.data
|
||||
assert len(CORE.data[cg._KEY_LAMBDA_DEDUP_DECLARATIONS]) == 1
|
||||
|
||||
# Verify the declaration content is correct
|
||||
declaration = CORE.data[cg._KEY_LAMBDA_DEDUP_DECLARATIONS][0]
|
||||
assert "shared_lambda_0" in declaration
|
||||
assert "return 42;" in declaration
|
||||
|
||||
# Note: The actual flushing happens via CORE.add_job with FINAL priority
|
||||
# during real code generation, so we don't test that here
|
||||
|
||||
|
||||
def test_shared_function_lambda_expression() -> None:
|
||||
"""Test SharedFunctionLambdaExpression behaves correctly."""
|
||||
shared_lambda = cg.SharedFunctionLambdaExpression(
|
||||
func_name="shared_lambda_0",
|
||||
parameters=[],
|
||||
return_type=cg.RawExpression("int"),
|
||||
)
|
||||
|
||||
# Should output just the function name
|
||||
assert str(shared_lambda) == "shared_lambda_0"
|
||||
|
||||
# Should have empty capture (stateless)
|
||||
assert shared_lambda.capture == ""
|
||||
|
||||
# Should have empty content (just a reference)
|
||||
assert shared_lambda.content == ""
|
||||
|
||||
|
||||
def test_lambda_deduplication_counter() -> None:
|
||||
"""Test that lambda counter increments correctly."""
|
||||
# Create 3 different lambdas
|
||||
for i in range(3):
|
||||
lambda_expr = cg.LambdaExpression(
|
||||
parts=[f"return {i};"],
|
||||
parameters=[],
|
||||
capture="",
|
||||
return_type=cg.RawExpression("int"),
|
||||
)
|
||||
func_name = cg._get_shared_lambda_name(lambda_expr)
|
||||
assert func_name == f"shared_lambda_{i}"
|
||||
|
||||
|
||||
def test_lambda_format_body() -> None:
|
||||
"""Test that format_body correctly formats lambda body with source."""
|
||||
# Without source
|
||||
lambda1 = cg.LambdaExpression(
|
||||
parts=["return 42;"],
|
||||
parameters=[],
|
||||
capture="",
|
||||
return_type=None,
|
||||
source=None,
|
||||
)
|
||||
assert lambda1.format_body() == "return 42;"
|
||||
|
||||
# With source would need a proper source object, skip for now
|
||||
|
||||
|
||||
def test_stateful_lambdas_not_deduplicated() -> None:
|
||||
"""Test that stateful lambdas (non-empty capture) are not deduplicated."""
|
||||
# _get_shared_lambda_name is only called for stateless lambdas (capture == "")
|
||||
# Stateful lambdas bypass deduplication entirely in process_lambda
|
||||
|
||||
# Verify that a stateful lambda would NOT get deduplicated
|
||||
# by checking it's not in the stateless dedup cache
|
||||
stateful_lambda = cg.LambdaExpression(
|
||||
parts=["return x + y;"],
|
||||
parameters=[],
|
||||
capture="=", # Non-empty capture means stateful
|
||||
return_type=cg.RawExpression("int"),
|
||||
)
|
||||
|
||||
# Stateful lambdas should NOT be passed to _get_shared_lambda_name
|
||||
# This is enforced by the `if capture == ""` check in process_lambda
|
||||
# We verify the lambda has a non-empty capture
|
||||
assert stateful_lambda.capture != ""
|
||||
assert stateful_lambda.capture == "="
|
||||
|
||||
|
||||
def test_static_variable_detection() -> None:
|
||||
"""Test detection of static variables in lambda code."""
|
||||
# Should detect static variables
|
||||
assert cg._has_static_variables("static int counter = 0;")
|
||||
assert cg._has_static_variables("static bool flag = false; return flag;")
|
||||
assert cg._has_static_variables(" static float value = 1.0; ")
|
||||
|
||||
# Should NOT detect static_cast, static_assert, etc. (with underscores)
|
||||
assert not cg._has_static_variables("return static_cast<int>(value);")
|
||||
assert not cg._has_static_variables("static_assert(sizeof(int) == 4);")
|
||||
assert not cg._has_static_variables("auto ptr = static_pointer_cast<Foo>(bar);")
|
||||
|
||||
# Edge case: 'cast', 'assert', 'pointer_cast' are NOT C++ keywords
|
||||
# Someone could use them as type names, but we should NOT flag them
|
||||
# because they're not actually static variables with state
|
||||
# NOTE: These are valid C++ but extremely unlikely in ESPHome lambdas
|
||||
assert not cg._has_static_variables("static cast obj;") # 'cast' as type name
|
||||
assert not cg._has_static_variables("static assert value;") # 'assert' as type name
|
||||
assert not cg._has_static_variables(
|
||||
"static pointer_cast ptr;"
|
||||
) # 'pointer_cast' as type
|
||||
|
||||
# Should NOT detect in comments
|
||||
assert not cg._has_static_variables("// static int x = 0;\nreturn 42;")
|
||||
assert not cg._has_static_variables("/* static int y = 0; */ return 42;")
|
||||
|
||||
# Should detect even with comments elsewhere
|
||||
assert cg._has_static_variables("// comment\nstatic int x = 0;\nreturn x;")
|
||||
|
||||
# Should NOT detect non-static code
|
||||
assert not cg._has_static_variables("int counter = 0; return counter++;")
|
||||
assert not cg._has_static_variables("return 42;")
|
||||
|
||||
|
||||
def test_lambdas_with_static_not_deduplicated() -> None:
|
||||
"""Test that lambdas with static variables are not deduplicated."""
|
||||
# Two identical lambdas with static variables
|
||||
lambda1 = cg.LambdaExpression(
|
||||
parts=["static int counter = 0; return counter++;"],
|
||||
parameters=[],
|
||||
capture="",
|
||||
return_type=cg.RawExpression("int"),
|
||||
)
|
||||
|
||||
lambda2 = cg.LambdaExpression(
|
||||
parts=["static int counter = 0; return counter++;"],
|
||||
parameters=[],
|
||||
capture="",
|
||||
return_type=cg.RawExpression("int"),
|
||||
)
|
||||
|
||||
# Should return None (not deduplicated)
|
||||
func_name1 = cg._get_shared_lambda_name(lambda1)
|
||||
func_name2 = cg._get_shared_lambda_name(lambda2)
|
||||
|
||||
assert func_name1 is None
|
||||
assert func_name2 is None
|
||||
|
||||
|
||||
def test_lambdas_without_static_still_deduplicated() -> None:
|
||||
"""Test that lambdas without static variables are still deduplicated."""
|
||||
# Two identical lambdas WITHOUT static variables
|
||||
lambda1 = cg.LambdaExpression(
|
||||
parts=["int counter = 0; return counter++;"], # No static
|
||||
parameters=[],
|
||||
capture="",
|
||||
return_type=cg.RawExpression("int"),
|
||||
)
|
||||
|
||||
lambda2 = cg.LambdaExpression(
|
||||
parts=["int counter = 0; return counter++;"], # No static
|
||||
parameters=[],
|
||||
capture="",
|
||||
return_type=cg.RawExpression("int"),
|
||||
)
|
||||
|
||||
# Should be deduplicated (same function name)
|
||||
func_name1 = cg._get_shared_lambda_name(lambda1)
|
||||
func_name2 = cg._get_shared_lambda_name(lambda2)
|
||||
|
||||
assert func_name1 is not None
|
||||
assert func_name2 is not None
|
||||
assert func_name1 == func_name2
|
||||
Reference in New Issue
Block a user