1
0
mirror of https://github.com/esphome/esphome.git synced 2025-11-17 07:15:48 +00:00

Merge remote-tracking branch 'upstream/dev' into integration

This commit is contained in:
J. Nick Koston
2025-11-04 08:34:24 -06:00
43 changed files with 1058 additions and 355 deletions

View File

@@ -192,6 +192,11 @@ jobs:
with:
python-version: ${{ env.DEFAULT_PYTHON }}
cache-key: ${{ needs.common.outputs.cache-key }}
- name: Restore components graph cache
uses: actions/cache/restore@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
with:
path: .temp/components_graph.json
key: components-graph-${{ hashFiles('esphome/components/**/*.py') }}
- name: Determine which tests to run
id: determine
env:
@@ -216,6 +221,12 @@ jobs:
echo "cpp-unit-tests-run-all=$(echo "$output" | jq -r '.cpp_unit_tests_run_all')" >> $GITHUB_OUTPUT
echo "cpp-unit-tests-components=$(echo "$output" | jq -c '.cpp_unit_tests_components')" >> $GITHUB_OUTPUT
echo "component-test-batches=$(echo "$output" | jq -c '.component_test_batches')" >> $GITHUB_OUTPUT
- name: Save components graph cache
if: github.ref == 'refs/heads/dev'
uses: actions/cache/save@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
with:
path: .temp/components_graph.json
key: components-graph-${{ hashFiles('esphome/components/**/*.py') }}
integration-tests:
name: Run integration tests

View File

@@ -11,4 +11,5 @@ CONF_DRAW_ROUNDING = "draw_rounding"
CONF_ON_RECEIVE = "on_receive"
CONF_ON_STATE_CHANGE = "on_state_change"
CONF_REQUEST_HEADERS = "request_headers"
CONF_ROWS = "rows"
CONF_USE_PSRAM = "use_psram"

View File

@@ -70,7 +70,7 @@ bool DallasTemperatureSensor::read_scratch_pad_() {
}
void DallasTemperatureSensor::setup() {
if (!this->check_address_())
if (!this->check_address_or_index_())
return;
if (!this->read_scratch_pad_())
return;

View File

@@ -27,10 +27,6 @@ extern "C" {
#include <esp32-hal-bt.h>
#endif
#ifdef USE_SOCKET_SELECT_SUPPORT
#include <lwip/sockets.h>
#endif
namespace esphome::esp32_ble {
static const char *const TAG = "esp32_ble";

View File

@@ -25,10 +25,6 @@
#include <esp_gattc_api.h>
#include <esp_gatts_api.h>
#ifdef USE_SOCKET_SELECT_SUPPORT
#include <lwip/sockets.h>
#endif
namespace esphome::esp32_ble {
// Maximum size of the BLE event queue

View File

@@ -4,6 +4,7 @@ from esphome import automation, pins
import esphome.codegen as cg
from esphome.components import i2c
from esphome.components.esp32 import add_idf_component
from esphome.components.psram import DOMAIN as psram_domain
import esphome.config_validation as cv
from esphome.const import (
CONF_BRIGHTNESS,
@@ -26,10 +27,9 @@ import esphome.final_validate as fv
_LOGGER = logging.getLogger(__name__)
AUTO_LOAD = ["camera"]
DEPENDENCIES = ["esp32"]
AUTO_LOAD = ["camera", "psram"]
esp32_camera_ns = cg.esphome_ns.namespace("esp32_camera")
ESP32Camera = esp32_camera_ns.class_("ESP32Camera", cg.PollingComponent, cg.EntityBase)
ESP32CameraImageData = esp32_camera_ns.struct("CameraImageData")
@@ -163,6 +163,14 @@ CONF_ON_IMAGE = "on_image"
camera_range_param = cv.int_range(min=-2, max=2)
def validate_fb_location_(value):
validator = cv.enum(ENUM_FB_LOCATION, upper=True)
if value.lower() == psram_domain:
validator = cv.All(validator, cv.requires_component(psram_domain))
return validator(value)
CONFIG_SCHEMA = cv.All(
cv.ENTITY_BASE_SCHEMA.extend(
{
@@ -236,9 +244,9 @@ CONFIG_SCHEMA = cv.All(
cv.framerate, cv.Range(min=0, max=1)
),
cv.Optional(CONF_FRAME_BUFFER_COUNT, default=1): cv.int_range(min=1, max=2),
cv.Optional(CONF_FRAME_BUFFER_LOCATION, default="PSRAM"): cv.enum(
ENUM_FB_LOCATION, upper=True
),
cv.Optional(
CONF_FRAME_BUFFER_LOCATION, default="PSRAM"
): validate_fb_location_,
cv.Optional(CONF_ON_STREAM_START): automation.validate_automation(
{
cv.GenerateID(CONF_TRIGGER_ID): cv.declare_id(

View File

@@ -1,6 +1,6 @@
from esphome import automation, core
import esphome.codegen as cg
from esphome.components import wifi
from esphome.components import socket, wifi
from esphome.components.udp import CONF_ON_RECEIVE
import esphome.config_validation as cv
from esphome.const import (
@@ -17,6 +17,7 @@ from esphome.core import CORE, HexInt
from esphome.types import ConfigType
CODEOWNERS = ["@jesserockz"]
AUTO_LOAD = ["socket"]
byte_vector = cg.std_vector.template(cg.uint8)
peer_address_t = cg.std_ns.class_("array").template(cg.uint8, 6)
@@ -120,6 +121,10 @@ async def to_code(config):
if CORE.using_arduino:
cg.add_library("WiFi", None)
# ESP-NOW uses wake_loop_threadsafe() to wake the main loop from ESP-NOW callbacks
# This enables low-latency event processing instead of waiting for select() timeout
socket.require_wake_loop_threadsafe()
cg.add_define("USE_ESPNOW")
if wifi_channel := config.get(CONF_CHANNEL):
cg.add(var.set_wifi_channel(wifi_channel))

View File

@@ -4,6 +4,7 @@
#include "espnow_err.h"
#include "esphome/core/application.h"
#include "esphome/core/defines.h"
#include "esphome/core/log.h"
@@ -97,6 +98,11 @@ void on_send_report(const uint8_t *mac_addr, esp_now_send_status_t status)
// Push the packet to the queue
global_esp_now->receive_packet_queue_.push(packet);
// Push always because we're the only producer and the pool ensures we never exceed queue size
// Wake main loop immediately to process ESP-NOW send event instead of waiting for select() timeout
#if defined(USE_SOCKET_SELECT_SUPPORT) && defined(USE_WAKE_LOOP_THREADSAFE)
App.wake_loop_threadsafe();
#endif
}
void on_data_received(const esp_now_recv_info_t *info, const uint8_t *data, int size) {
@@ -114,6 +120,11 @@ void on_data_received(const esp_now_recv_info_t *info, const uint8_t *data, int
// Push the packet to the queue
global_esp_now->receive_packet_queue_.push(packet);
// Push always because we're the only producer and the pool ensures we never exceed queue size
// Wake main loop immediately to process ESP-NOW receive event instead of waiting for select() timeout
#if defined(USE_SOCKET_SELECT_SUPPORT) && defined(USE_WAKE_LOOP_THREADSAFE)
App.wake_loop_threadsafe();
#endif
}
ESPNowComponent::ESPNowComponent() { global_esp_now = this; }

View File

@@ -20,8 +20,7 @@ import esphome.final_validate as fv
from .const import INKPLATE_10_CUSTOM_WAVEFORMS, WAVEFORMS
DEPENDENCIES = ["i2c", "esp32"]
AUTO_LOAD = ["psram"]
DEPENDENCIES = ["i2c", "esp32", "psram"]
CONF_DISPLAY_DATA_0_PIN = "display_data_0_pin"
CONF_DISPLAY_DATA_1_PIN = "display_data_1_pin"

View File

@@ -41,10 +41,7 @@ from .lv_validation import lv_bool, lv_images_used
from .lvcode import LvContext, LvglComponent, lvgl_static
from .schemas import (
DISP_BG_SCHEMA,
FLEX_OBJ_SCHEMA,
FULL_STYLE_SCHEMA,
GRID_CELL_SCHEMA,
LAYOUT_SCHEMAS,
WIDGET_TYPES,
any_widget_schema,
container_schema,
@@ -78,6 +75,7 @@ from .widgets.button import button_spec
from .widgets.buttonmatrix import buttonmatrix_spec
from .widgets.canvas import canvas_spec
from .widgets.checkbox import checkbox_spec
from .widgets.container import container_spec
from .widgets.dropdown import dropdown_spec
from .widgets.img import img_spec
from .widgets.keyboard import keyboard_spec
@@ -130,20 +128,10 @@ for w_type in (
tileview_spec,
qr_code_spec,
canvas_spec,
container_spec,
):
WIDGET_TYPES[w_type.name] = w_type
WIDGET_SCHEMA = any_widget_schema()
LAYOUT_SCHEMAS[df.TYPE_GRID] = {
cv.Optional(df.CONF_WIDGETS): cv.ensure_list(any_widget_schema(GRID_CELL_SCHEMA))
}
LAYOUT_SCHEMAS[df.TYPE_FLEX] = {
cv.Optional(df.CONF_WIDGETS): cv.ensure_list(any_widget_schema(FLEX_OBJ_SCHEMA))
}
LAYOUT_SCHEMAS[df.TYPE_NONE] = {
cv.Optional(df.CONF_WIDGETS): cv.ensure_list(any_widget_schema())
}
for w_type in WIDGET_TYPES.values():
register_action(
f"lvgl.{w_type.name}.update",
@@ -410,7 +398,7 @@ def display_schema(config):
def add_hello_world(config):
if df.CONF_WIDGETS not in config and CONF_PAGES not in config:
LOGGER.info("No pages or widgets configured, creating default hello_world page")
config[df.CONF_WIDGETS] = cv.ensure_list(WIDGET_SCHEMA)(get_hello_world())
config[df.CONF_WIDGETS] = any_widget_schema()(get_hello_world())
return config
@@ -450,6 +438,7 @@ LVGL_SCHEMA = cv.All(
),
}
),
cv.Optional(CONF_PAGES): cv.ensure_list(container_schema(page_spec)),
**{
cv.Optional(x): validate_automation(
{
@@ -459,12 +448,6 @@ LVGL_SCHEMA = cv.All(
)
for x in SIMPLE_TRIGGERS
},
cv.Exclusive(df.CONF_WIDGETS, CONF_PAGES): cv.ensure_list(
WIDGET_SCHEMA
),
cv.Exclusive(CONF_PAGES, CONF_PAGES): cv.ensure_list(
container_schema(page_spec)
),
cv.Optional(df.CONF_MSGBOXES): cv.ensure_list(MSGBOX_SCHEMA),
cv.Optional(df.CONF_PAGE_WRAP, default=True): lv_bool,
cv.Optional(df.CONF_TOP_LAYER): container_schema(obj_spec),

View File

@@ -394,6 +394,8 @@ LV_FLEX_ALIGNMENTS = LvConstant(
"SPACE_BETWEEN",
)
LV_FLEX_CROSS_ALIGNMENTS = LV_FLEX_ALIGNMENTS.extend("STRETCH")
LV_MENU_MODES = LvConstant(
"LV_MENU_HEADER_",
"TOP_FIXED",
@@ -436,6 +438,7 @@ CONF_BUTTONS = "buttons"
CONF_BYTE_ORDER = "byte_order"
CONF_CHANGE_RATE = "change_rate"
CONF_CLOSE_BUTTON = "close_button"
CONF_CONTAINER = "container"
CONF_CONTROL = "control"
CONF_DEFAULT_FONT = "default_font"
CONF_DEFAULT_GROUP = "default_group"

View File

@@ -0,0 +1,357 @@
import re
import esphome.config_validation as cv
from esphome.const import CONF_HEIGHT, CONF_TYPE, CONF_WIDTH
from .defines import (
CONF_FLEX_ALIGN_CROSS,
CONF_FLEX_ALIGN_MAIN,
CONF_FLEX_ALIGN_TRACK,
CONF_FLEX_FLOW,
CONF_FLEX_GROW,
CONF_GRID_CELL_COLUMN_POS,
CONF_GRID_CELL_COLUMN_SPAN,
CONF_GRID_CELL_ROW_POS,
CONF_GRID_CELL_ROW_SPAN,
CONF_GRID_CELL_X_ALIGN,
CONF_GRID_CELL_Y_ALIGN,
CONF_GRID_COLUMN_ALIGN,
CONF_GRID_COLUMNS,
CONF_GRID_ROW_ALIGN,
CONF_GRID_ROWS,
CONF_LAYOUT,
CONF_PAD_COLUMN,
CONF_PAD_ROW,
CONF_WIDGETS,
FLEX_FLOWS,
LV_CELL_ALIGNMENTS,
LV_FLEX_ALIGNMENTS,
LV_FLEX_CROSS_ALIGNMENTS,
LV_GRID_ALIGNMENTS,
TYPE_FLEX,
TYPE_GRID,
TYPE_NONE,
LvConstant,
)
from .lv_validation import padding, size
cell_alignments = LV_CELL_ALIGNMENTS.one_of
grid_alignments = LV_GRID_ALIGNMENTS.one_of
flex_alignments = LV_FLEX_ALIGNMENTS.one_of
FLEX_LAYOUT_SCHEMA = {
cv.Required(CONF_TYPE): cv.one_of(TYPE_FLEX, lower=True),
cv.Optional(CONF_FLEX_FLOW, default="row_wrap"): FLEX_FLOWS.one_of,
cv.Optional(CONF_FLEX_ALIGN_MAIN, default="start"): flex_alignments,
cv.Optional(
CONF_FLEX_ALIGN_CROSS, default="start"
): LV_FLEX_CROSS_ALIGNMENTS.one_of,
cv.Optional(CONF_FLEX_ALIGN_TRACK, default="start"): flex_alignments,
cv.Optional(CONF_PAD_ROW): padding,
cv.Optional(CONF_PAD_COLUMN): padding,
cv.Optional(CONF_FLEX_GROW): cv.int_,
}
FLEX_HV_STYLE = {
CONF_FLEX_ALIGN_MAIN: "LV_FLEX_ALIGN_SPACE_EVENLY",
CONF_FLEX_ALIGN_TRACK: "LV_FLEX_ALIGN_CENTER",
CONF_FLEX_ALIGN_CROSS: "LV_FLEX_ALIGN_CENTER",
CONF_TYPE: TYPE_FLEX,
}
FLEX_OBJ_SCHEMA = {
cv.Optional(CONF_FLEX_GROW): cv.int_,
}
def flex_hv_schema(dir):
dir = CONF_HEIGHT if dir == "horizontal" else CONF_WIDTH
return {
cv.Optional(CONF_FLEX_GROW, default=1): cv.int_,
cv.Optional(dir, default="100%"): size,
}
def grid_free_space(value):
value = cv.Upper(value)
if value.startswith("FR(") and value.endswith(")"):
value = value.removesuffix(")").removeprefix("FR(")
return f"LV_GRID_FR({cv.positive_int(value)})"
raise cv.Invalid("must be a size in pixels, CONTENT or FR(nn)")
grid_spec = cv.Any(size, LvConstant("LV_GRID_", "CONTENT").one_of, grid_free_space)
GRID_CELL_SCHEMA = {
cv.Optional(CONF_GRID_CELL_ROW_POS): cv.positive_int,
cv.Optional(CONF_GRID_CELL_COLUMN_POS): cv.positive_int,
cv.Optional(CONF_GRID_CELL_ROW_SPAN, default=1): cv.positive_int,
cv.Optional(CONF_GRID_CELL_COLUMN_SPAN, default=1): cv.positive_int,
cv.Optional(CONF_GRID_CELL_X_ALIGN): grid_alignments,
cv.Optional(CONF_GRID_CELL_Y_ALIGN): grid_alignments,
}
class Layout:
"""
Define properties for a layout
The base class is layout "none"
"""
def get_type(self):
return TYPE_NONE
def get_layout_schemas(self, config: dict) -> tuple:
"""
Get the layout and child schema for a given widget based on its layout type.
"""
return None, {}
def validate(self, config):
"""
Validate the layout configuration. This is called late in the schema validation
:param config: The input configuration
:return: The validated configuration
"""
return config
class FlexLayout(Layout):
def get_type(self):
return TYPE_FLEX
def get_layout_schemas(self, config: dict) -> tuple:
layout = config.get(CONF_LAYOUT)
if not isinstance(layout, dict) or layout.get(CONF_TYPE) != TYPE_FLEX:
return None, {}
child_schema = FLEX_OBJ_SCHEMA
if grow := layout.get(CONF_FLEX_GROW):
child_schema = {cv.Optional(CONF_FLEX_GROW, default=grow): cv.int_}
# Polyfill to implement stretch alignment for flex containers
# LVGL does not support this natively, so we add a 100% size property to the children in the cross-axis
if layout.get(CONF_FLEX_ALIGN_CROSS) == "LV_FLEX_ALIGN_STRETCH":
dimension = (
CONF_WIDTH
if "COLUMN" in layout[CONF_FLEX_FLOW].upper()
else CONF_HEIGHT
)
child_schema[cv.Optional(dimension, default="100%")] = size
return FLEX_LAYOUT_SCHEMA, child_schema
def validate(self, config):
"""
Perform validation on the container and its children for this layout
:param config:
:return:
"""
return config
class DirectionalLayout(FlexLayout):
def __init__(self, direction: str, flow):
"""
:param direction: "horizontal" or "vertical"
:param flow: "row" or "column"
"""
super().__init__()
self.direction = direction
self.flow = flow
def get_type(self):
return self.direction
def get_layout_schemas(self, config: dict) -> tuple:
if config.get(CONF_LAYOUT, "").lower() != self.direction:
return None, {}
return cv.one_of(self.direction, lower=True), flex_hv_schema(self.direction)
def validate(self, config):
assert config[CONF_LAYOUT].lower() == self.direction
config[CONF_LAYOUT] = {
**FLEX_HV_STYLE,
CONF_FLEX_FLOW: "LV_FLEX_FLOW_" + self.flow.upper(),
}
return config
class GridLayout(Layout):
_GRID_LAYOUT_REGEX = re.compile(r"^\s*(\d+)\s*x\s*(\d+)\s*$")
def get_type(self):
return TYPE_GRID
def get_layout_schemas(self, config: dict) -> tuple:
layout = config.get(CONF_LAYOUT)
if isinstance(layout, str):
if GridLayout._GRID_LAYOUT_REGEX.match(layout):
return (
cv.string,
{
cv.Optional(CONF_GRID_CELL_ROW_POS): cv.positive_int,
cv.Optional(CONF_GRID_CELL_COLUMN_POS): cv.positive_int,
cv.Optional(
CONF_GRID_CELL_ROW_SPAN, default=1
): cv.positive_int,
cv.Optional(
CONF_GRID_CELL_COLUMN_SPAN, default=1
): cv.positive_int,
cv.Optional(
CONF_GRID_CELL_X_ALIGN, default="center"
): grid_alignments,
cv.Optional(
CONF_GRID_CELL_Y_ALIGN, default="center"
): grid_alignments,
},
)
# Not a valid grid layout string
return None, {}
if not isinstance(layout, dict) or layout.get(CONF_TYPE) != TYPE_GRID:
return None, {}
return (
{
cv.Required(CONF_TYPE): cv.one_of(TYPE_GRID, lower=True),
cv.Required(CONF_GRID_ROWS): [grid_spec],
cv.Required(CONF_GRID_COLUMNS): [grid_spec],
cv.Optional(CONF_GRID_COLUMN_ALIGN): grid_alignments,
cv.Optional(CONF_GRID_ROW_ALIGN): grid_alignments,
cv.Optional(CONF_PAD_ROW): padding,
cv.Optional(CONF_PAD_COLUMN): padding,
},
{
cv.Optional(CONF_GRID_CELL_ROW_POS): cv.positive_int,
cv.Optional(CONF_GRID_CELL_COLUMN_POS): cv.positive_int,
cv.Optional(CONF_GRID_CELL_ROW_SPAN, default=1): cv.positive_int,
cv.Optional(CONF_GRID_CELL_COLUMN_SPAN, default=1): cv.positive_int,
cv.Optional(CONF_GRID_CELL_X_ALIGN): grid_alignments,
cv.Optional(CONF_GRID_CELL_Y_ALIGN): grid_alignments,
},
)
def validate(self, config: dict):
"""
Validate the grid layout.
The `layout:` key may be a dictionary with `rows` and `columns` keys, or a string in the format "rows x columns".
Either all cells must have a row and column,
or none, in which case the grid layout is auto-generated.
:param config:
:return: The config updated with auto-generated values
"""
layout = config.get(CONF_LAYOUT)
if isinstance(layout, str):
# If the layout is a string, assume it is in the format "rows x columns", implying
# a grid layout with the specified number of rows and columns each with CONTENT sizing.
layout = layout.strip()
match = GridLayout._GRID_LAYOUT_REGEX.match(layout)
if match:
rows = int(match.group(1))
cols = int(match.group(2))
layout = {
CONF_TYPE: TYPE_GRID,
CONF_GRID_ROWS: ["LV_GRID_FR(1)"] * rows,
CONF_GRID_COLUMNS: ["LV_GRID_FR(1)"] * cols,
}
config[CONF_LAYOUT] = layout
else:
raise cv.Invalid(
f"Invalid grid layout format: {config}, expected 'rows x columns'",
[CONF_LAYOUT],
)
# should be guaranteed to be a dict at this point
assert isinstance(layout, dict)
assert layout.get(CONF_TYPE) == TYPE_GRID
rows = len(layout[CONF_GRID_ROWS])
columns = len(layout[CONF_GRID_COLUMNS])
used_cells = [[None] * columns for _ in range(rows)]
for index, widget in enumerate(config.get(CONF_WIDGETS, [])):
_, w = next(iter(widget.items()))
if (CONF_GRID_CELL_COLUMN_POS in w) != (CONF_GRID_CELL_ROW_POS in w):
raise cv.Invalid(
"Both row and column positions must be specified, or both omitted",
[CONF_WIDGETS, index],
)
if CONF_GRID_CELL_ROW_POS in w:
row = w[CONF_GRID_CELL_ROW_POS]
column = w[CONF_GRID_CELL_COLUMN_POS]
else:
try:
row, column = next(
(r_idx, c_idx)
for r_idx, row in enumerate(used_cells)
for c_idx, value in enumerate(row)
if value is None
)
except StopIteration:
raise cv.Invalid(
"No free cells available in grid layout", [CONF_WIDGETS, index]
) from None
w[CONF_GRID_CELL_ROW_POS] = row
w[CONF_GRID_CELL_COLUMN_POS] = column
for i in range(w[CONF_GRID_CELL_ROW_SPAN]):
for j in range(w[CONF_GRID_CELL_COLUMN_SPAN]):
if row + i >= rows or column + j >= columns:
raise cv.Invalid(
f"Cell at {row}/{column} span {w[CONF_GRID_CELL_ROW_SPAN]}x{w[CONF_GRID_CELL_COLUMN_SPAN]} "
f"exceeds grid size {rows}x{columns}",
[CONF_WIDGETS, index],
)
if used_cells[row + i][column + j] is not None:
raise cv.Invalid(
f"Cell span {row + i}/{column + j} already occupied by widget at index {used_cells[row + i][column + j]}",
[CONF_WIDGETS, index],
)
used_cells[row + i][column + j] = index
return config
LAYOUT_CLASSES = (
FlexLayout(),
GridLayout(),
DirectionalLayout("horizontal", "row"),
DirectionalLayout("vertical", "column"),
)
LAYOUT_CHOICES = [x.get_type() for x in LAYOUT_CLASSES]
def append_layout_schema(schema, config: dict):
"""
Get the child layout schema for a given widget based on its layout type.
:param config: The config to check
:return: A schema for the layout including a widgets key
"""
# Local import to avoid circular dependencies
if CONF_WIDGETS not in config:
if CONF_LAYOUT in config:
raise cv.Invalid(
f"Layout {config[CONF_LAYOUT]} requires a {CONF_WIDGETS} key",
[CONF_LAYOUT],
)
return schema
from .schemas import any_widget_schema
if CONF_LAYOUT not in config:
# If no layout is specified, return the schema as is
return schema.extend({cv.Optional(CONF_WIDGETS): any_widget_schema()})
for layout_class in LAYOUT_CLASSES:
layout_schema, child_schema = layout_class.get_layout_schemas(config)
if layout_schema:
layout_schema = cv.Schema(
{
cv.Required(CONF_LAYOUT): layout_schema,
cv.Required(CONF_WIDGETS): any_widget_schema(child_schema),
}
)
layout_schema.add_extra(layout_class.validate)
return layout_schema.extend(schema)
# If no layout class matched, return a default schema
return cv.Schema(
{
cv.Optional(CONF_LAYOUT): cv.one_of(*LAYOUT_CHOICES, lower=True),
cv.Optional(CONF_WIDGETS): any_widget_schema(),
}
)

View File

@@ -1,3 +1,4 @@
import re
from typing import TYPE_CHECKING, Any
import esphome.codegen as cg
@@ -246,6 +247,8 @@ def pixels_or_percent_validator(value):
return ["pixels", "..%"]
if isinstance(value, str) and value.lower().endswith("px"):
value = cv.int_(value[:-2])
if isinstance(value, str) and re.match(r"^lv_pct\((\d+)\)$", value):
return value
value = cv.Any(cv.int_, cv.percentage)(value)
if isinstance(value, int):
return value

View File

@@ -299,6 +299,7 @@ class LvExpr(MockLv):
# Top level mock for generic lv_ calls to be recorded
lv = MockLv("lv_")
LV = MockLv("LV_")
# Just generate an expression
lv_expr = LvExpr("lv_")
# Mock for lv_obj_ calls
@@ -327,7 +328,7 @@ def lv_assign(target, expression):
lv_add(AssignmentExpression("", "", target, expression))
def lv_Pvariable(type, name):
def lv_Pvariable(type, name) -> MockObj:
"""
Create but do not initialise a pointer variable
:param type: Type of the variable target
@@ -343,7 +344,7 @@ def lv_Pvariable(type, name):
return var
def lv_variable(type, name):
def lv_variable(type, name) -> MockObj:
"""
Create but do not initialise a variable
:param type: Type of the variable target

View File

@@ -171,6 +171,7 @@ bool LvPageType::is_showing() const { return this->parent_->get_current_page() =
void LvglComponent::draw_buffer_(const lv_area_t *area, lv_color_t *ptr) {
auto width = lv_area_get_width(area);
auto height = lv_area_get_height(area);
auto height_rounded = (height + this->draw_rounding - 1) / this->draw_rounding * this->draw_rounding;
auto x1 = area->x1;
auto y1 = area->y1;
lv_color_t *dst = this->rotate_buf_;
@@ -178,13 +179,13 @@ void LvglComponent::draw_buffer_(const lv_area_t *area, lv_color_t *ptr) {
case display::DISPLAY_ROTATION_90_DEGREES:
for (lv_coord_t x = height; x-- != 0;) {
for (lv_coord_t y = 0; y != width; y++) {
dst[y * height + x] = *ptr++;
dst[y * height_rounded + x] = *ptr++;
}
}
y1 = x1;
x1 = this->disp_drv_.ver_res - area->y1 - height;
width = height;
height = lv_area_get_width(area);
height = width;
width = height_rounded;
break;
case display::DISPLAY_ROTATION_180_DEGREES:
@@ -200,13 +201,13 @@ void LvglComponent::draw_buffer_(const lv_area_t *area, lv_color_t *ptr) {
case display::DISPLAY_ROTATION_270_DEGREES:
for (lv_coord_t x = 0; x != height; x++) {
for (lv_coord_t y = width; y-- != 0;) {
dst[y * height + x] = *ptr++;
dst[y * height_rounded + x] = *ptr++;
}
}
x1 = y1;
y1 = this->disp_drv_.hor_res - area->x1 - width;
width = height;
height = lv_area_get_width(area);
height = width;
width = height_rounded;
break;
default:
@@ -443,8 +444,10 @@ LvglComponent::LvglComponent(std::vector<display::Display *> displays, float buf
void LvglComponent::setup() {
auto *display = this->displays_[0];
auto width = display->get_width();
auto height = display->get_height();
auto rounding = this->draw_rounding;
// cater for displays with dimensions that don't divide by the required rounding
auto width = (display->get_width() + rounding - 1) / rounding * rounding;
auto height = (display->get_height() + rounding - 1) / rounding * rounding;
auto frac = this->buffer_frac_;
if (frac == 0)
frac = 1;
@@ -469,9 +472,8 @@ void LvglComponent::setup() {
}
this->buffer_frac_ = frac;
lv_disp_draw_buf_init(&this->draw_buf_, buffer, nullptr, buffer_pixels);
this->disp_drv_.hor_res = width;
this->disp_drv_.ver_res = height;
// this->setup_driver_(display->get_width(), display->get_height());
this->disp_drv_.hor_res = display->get_width();
this->disp_drv_.ver_res = display->get_height();
lv_disp_drv_update(this->disp_, &this->disp_drv_);
this->rotation = display->get_rotation();
if (this->rotation != display::DISPLAY_ROTATION_0_DEGREES) {

View File

@@ -12,17 +12,21 @@ from esphome.const import (
CONF_TEXT,
CONF_TIME,
CONF_TRIGGER_ID,
CONF_TYPE,
CONF_X,
CONF_Y,
)
from esphome.core import TimePeriod
from esphome.core.config import StartupTrigger
from esphome.schema_extractors import SCHEMA_EXTRACT
from . import defines as df, lv_validation as lvalid
from .defines import CONF_TIME_FORMAT, LV_GRAD_DIR, TYPE_GRID
from .helpers import add_lv_use, requires_component, validate_printf
from .defines import CONF_TIME_FORMAT, LV_GRAD_DIR
from .helpers import requires_component, validate_printf
from .layout import (
FLEX_OBJ_SCHEMA,
GRID_CELL_SCHEMA,
append_layout_schema,
grid_alignments,
)
from .lv_validation import lv_color, lv_font, lv_gradient, lv_image, opacity
from .lvcode import LvglComponent, lv_event_t_ptr
from .types import (
@@ -72,11 +76,9 @@ def _validate_text(value):
# A schema for text properties
TEXT_SCHEMA = cv.Schema(
{
cv.Optional(CONF_TEXT): _validate_text,
}
)
TEXT_SCHEMA = {
cv.Optional(CONF_TEXT): _validate_text,
}
LIST_ACTION_SCHEMA = cv.ensure_list(
cv.maybe_simple_value(
@@ -136,7 +138,7 @@ STYLE_PROPS = {
"arc_opa": lvalid.opacity,
"arc_color": lvalid.lv_color,
"arc_rounded": lvalid.lv_bool,
"arc_width": lvalid.lv_positive_int,
"arc_width": lvalid.pixels,
"anim_time": lvalid.lv_milliseconds,
"bg_color": lvalid.lv_color,
"bg_grad": lv_gradient,
@@ -223,10 +225,6 @@ STYLE_REMAP = {
"image_recolor_opa": "img_recolor_opa",
}
cell_alignments = df.LV_CELL_ALIGNMENTS.one_of
grid_alignments = df.LV_GRID_ALIGNMENTS.one_of
flex_alignments = df.LV_FLEX_ALIGNMENTS.one_of
# Complete object style schema
STYLE_SCHEMA = cv.Schema({cv.Optional(k): v for k, v in STYLE_PROPS.items()}).extend(
{
@@ -266,10 +264,8 @@ def part_schema(parts):
:param parts: The parts to include
:return: The schema
"""
return (
cv.Schema({cv.Optional(part): STATE_SCHEMA for part in parts})
.extend(STATE_SCHEMA)
.extend(FLAG_SCHEMA)
return STATE_SCHEMA.extend(FLAG_SCHEMA).extend(
{cv.Optional(part): STATE_SCHEMA for part in parts}
)
@@ -277,10 +273,10 @@ def automation_schema(typ: LvType):
events = df.LV_EVENT_TRIGGERS + df.SWIPE_TRIGGERS
if typ.has_on_value:
events = events + (CONF_ON_VALUE,)
args = typ.get_arg_type() if isinstance(typ, LvType) else []
args = typ.get_arg_type()
args.append(lv_event_t_ptr)
return cv.Schema(
{
return {
**{
cv.Optional(event): validate_automation(
{
cv.GenerateID(CONF_TRIGGER_ID): cv.declare_id(
@@ -289,14 +285,11 @@ def automation_schema(typ: LvType):
}
)
for event in events
}
).extend(
{
cv.Optional(CONF_ON_BOOT): validate_automation(
{cv.GenerateID(CONF_TRIGGER_ID): cv.declare_id(StartupTrigger)}
)
}
)
},
cv.Optional(CONF_ON_BOOT): validate_automation(
{cv.GenerateID(CONF_TRIGGER_ID): cv.declare_id(StartupTrigger)}
),
}
def base_update_schema(widget_type, parts):
@@ -335,75 +328,17 @@ def obj_schema(widget_type: WidgetType):
"""
return (
part_schema(widget_type.parts)
.extend(LAYOUT_SCHEMA)
.extend(ALIGN_TO_SCHEMA)
.extend(automation_schema(widget_type.w_type))
.extend(
cv.Schema(
{
cv.Optional(CONF_STATE): SET_STATE_SCHEMA,
cv.Optional(CONF_GROUP): cv.use_id(lv_group_t),
}
)
{
cv.Optional(CONF_STATE): SET_STATE_SCHEMA,
cv.Optional(CONF_GROUP): cv.use_id(lv_group_t),
}
)
)
def _validate_grid_layout(config):
layout = config[df.CONF_LAYOUT]
rows = len(layout[df.CONF_GRID_ROWS])
columns = len(layout[df.CONF_GRID_COLUMNS])
used_cells = [[None] * columns for _ in range(rows)]
for index, widget in enumerate(config[df.CONF_WIDGETS]):
_, w = next(iter(widget.items()))
if (df.CONF_GRID_CELL_COLUMN_POS in w) != (df.CONF_GRID_CELL_ROW_POS in w):
# pylint: disable=raise-missing-from
raise cv.Invalid(
"Both row and column positions must be specified, or both omitted",
[df.CONF_WIDGETS, index],
)
if df.CONF_GRID_CELL_ROW_POS in w:
row = w[df.CONF_GRID_CELL_ROW_POS]
column = w[df.CONF_GRID_CELL_COLUMN_POS]
else:
try:
row, column = next(
(r_idx, c_idx)
for r_idx, row in enumerate(used_cells)
for c_idx, value in enumerate(row)
if value is None
)
except StopIteration:
# pylint: disable=raise-missing-from
raise cv.Invalid(
"No free cells available in grid layout", [df.CONF_WIDGETS, index]
)
w[df.CONF_GRID_CELL_ROW_POS] = row
w[df.CONF_GRID_CELL_COLUMN_POS] = column
for i in range(w[df.CONF_GRID_CELL_ROW_SPAN]):
for j in range(w[df.CONF_GRID_CELL_COLUMN_SPAN]):
if row + i >= rows or column + j >= columns:
# pylint: disable=raise-missing-from
raise cv.Invalid(
f"Cell at {row}/{column} span {w[df.CONF_GRID_CELL_ROW_SPAN]}x{w[df.CONF_GRID_CELL_COLUMN_SPAN]} "
f"exceeds grid size {rows}x{columns}",
[df.CONF_WIDGETS, index],
)
if used_cells[row + i][column + j] is not None:
# pylint: disable=raise-missing-from
raise cv.Invalid(
f"Cell span {row + i}/{column + j} already occupied by widget at index {used_cells[row + i][column + j]}",
[df.CONF_WIDGETS, index],
)
used_cells[row + i][column + j] = index
return config
LAYOUT_SCHEMAS = {}
LAYOUT_VALIDATORS = {TYPE_GRID: _validate_grid_layout}
ALIGN_TO_SCHEMA = {
cv.Optional(df.CONF_ALIGN_TO): cv.Schema(
{
@@ -416,57 +351,6 @@ ALIGN_TO_SCHEMA = {
}
def grid_free_space(value):
value = cv.Upper(value)
if value.startswith("FR(") and value.endswith(")"):
value = value.removesuffix(")").removeprefix("FR(")
return f"LV_GRID_FR({cv.positive_int(value)})"
raise cv.Invalid("must be a size in pixels, CONTENT or FR(nn)")
grid_spec = cv.Any(
lvalid.size, df.LvConstant("LV_GRID_", "CONTENT").one_of, grid_free_space
)
LAYOUT_SCHEMA = {
cv.Optional(df.CONF_LAYOUT): cv.typed_schema(
{
df.TYPE_GRID: {
cv.Required(df.CONF_GRID_ROWS): [grid_spec],
cv.Required(df.CONF_GRID_COLUMNS): [grid_spec],
cv.Optional(df.CONF_GRID_COLUMN_ALIGN): grid_alignments,
cv.Optional(df.CONF_GRID_ROW_ALIGN): grid_alignments,
cv.Optional(df.CONF_PAD_ROW): lvalid.padding,
cv.Optional(df.CONF_PAD_COLUMN): lvalid.padding,
},
df.TYPE_FLEX: {
cv.Optional(
df.CONF_FLEX_FLOW, default="row_wrap"
): df.FLEX_FLOWS.one_of,
cv.Optional(df.CONF_FLEX_ALIGN_MAIN, default="start"): flex_alignments,
cv.Optional(df.CONF_FLEX_ALIGN_CROSS, default="start"): flex_alignments,
cv.Optional(df.CONF_FLEX_ALIGN_TRACK, default="start"): flex_alignments,
cv.Optional(df.CONF_PAD_ROW): lvalid.padding,
cv.Optional(df.CONF_PAD_COLUMN): lvalid.padding,
},
},
lower=True,
)
}
GRID_CELL_SCHEMA = {
cv.Optional(df.CONF_GRID_CELL_ROW_POS): cv.positive_int,
cv.Optional(df.CONF_GRID_CELL_COLUMN_POS): cv.positive_int,
cv.Optional(df.CONF_GRID_CELL_ROW_SPAN, default=1): cv.positive_int,
cv.Optional(df.CONF_GRID_CELL_COLUMN_SPAN, default=1): cv.positive_int,
cv.Optional(df.CONF_GRID_CELL_X_ALIGN): grid_alignments,
cv.Optional(df.CONF_GRID_CELL_Y_ALIGN): grid_alignments,
}
FLEX_OBJ_SCHEMA = {
cv.Optional(df.CONF_FLEX_GROW): cv.int_,
}
DISP_BG_SCHEMA = cv.Schema(
{
cv.Optional(df.CONF_DISP_BG_IMAGE): cv.Any(
@@ -498,48 +382,11 @@ ALL_STYLES = {
}
def container_validator(schema, widget_type: WidgetType):
"""
Create a validator for a container given the widget type
:param schema: Base schema to extend
:param widget_type:
:return:
"""
def validator(value):
if w_sch := widget_type.schema:
if isinstance(w_sch, dict):
w_sch = cv.Schema(w_sch)
# order is important here to preserve extras
result = w_sch.extend(schema)
else:
result = schema
ltype = df.TYPE_NONE
if value and (layout := value.get(df.CONF_LAYOUT)):
if not isinstance(layout, dict):
raise cv.Invalid("Layout value must be a dict")
ltype = layout.get(CONF_TYPE)
if not ltype:
raise (cv.Invalid("Layout schema requires type:"))
add_lv_use(ltype)
if value == SCHEMA_EXTRACT:
return result
result = result.extend(
LAYOUT_SCHEMAS.get(ltype.lower(), LAYOUT_SCHEMAS[df.TYPE_NONE])
)
value = result(value)
if layout_validator := LAYOUT_VALIDATORS.get(ltype):
value = layout_validator(value)
return value
return validator
def container_schema(widget_type: WidgetType, extras=None):
"""
Create a schema for a container widget of a given type. All obj properties are available, plus
the extras passed in, plus any defined for the specific widget being specified.
:param widget_type: The widget type, e.g. "img"
:param widget_type: The widget type, e.g. "image"
:param extras: Additional options to be made available, e.g. layout properties for children
:return: The schema for this type of widget.
"""
@@ -549,31 +396,49 @@ def container_schema(widget_type: WidgetType, extras=None):
if extras:
schema = schema.extend(extras)
# Delayed evaluation for recursion
return container_validator(schema, widget_type)
schema = schema.extend(widget_type.schema)
def widget_schema(widget_type: WidgetType, extras=None):
"""
Create a schema for a given widget type
:param widget_type: The name of the widget
:param extras:
:return:
"""
validator = container_schema(widget_type, extras=extras)
if required := widget_type.required_component:
validator = cv.All(validator, requires_component(required))
return cv.Exclusive(widget_type.name, df.CONF_WIDGETS), validator
def validator(value):
return append_layout_schema(schema, value)(value)
# All widget schemas must be defined before this is called.
return validator
def any_widget_schema(extras=None):
"""
Generate schemas for all possible LVGL widgets. This is what implements the ability to have a list of any kind of
Dynamically generate schemas for all possible LVGL widgets. This is what implements the ability to have a list of any kind of
widget under the widgets: key.
:param extras: Additional schema to be applied to each generated one
:return:
:return: A validator for the Widgets key
"""
return cv.Any(dict(widget_schema(wt, extras) for wt in WIDGET_TYPES.values()))
def validator(value):
if isinstance(value, dict):
# Convert to list
value = [{k: v} for k, v in value.items()]
if not isinstance(value, list):
raise cv.Invalid("Expected a list of widgets")
result = []
for index, entry in enumerate(value):
if not isinstance(entry, dict) or len(entry) != 1:
raise cv.Invalid(
"Each widget must be a dictionary with a single key", path=[index]
)
[(key, value)] = entry.items()
# Validate the widget against its schema
widget_type = WIDGET_TYPES.get(key)
if not widget_type:
raise cv.Invalid(f"Unknown widget type: {key}", path=[index])
container_validator = container_schema(widget_type, extras=extras)
if required := widget_type.required_component:
container_validator = cv.All(
container_validator, requires_component(required)
)
# Apply custom validation
value = widget_type.validate(value or {})
result.append({key: container_validator(value)})
return result
return validator

View File

@@ -1,6 +1,7 @@
import sys
from esphome import automation, codegen as cg
from esphome.config_validation import Schema
from esphome.const import CONF_MAX_VALUE, CONF_MIN_VALUE, CONF_TEXT, CONF_VALUE
from esphome.cpp_generator import MockObj, MockObjClass
from esphome.cpp_types import esphome_ns
@@ -135,14 +136,14 @@ class WidgetType:
self.lv_name = lv_name or name
self.w_type = w_type
self.parts = parts
if schema is None:
self.schema = {}
else:
self.schema = schema
if not isinstance(schema, Schema):
schema = Schema(schema or {})
self.schema = schema
if modify_schema is None:
self.modify_schema = self.schema
else:
self.modify_schema = modify_schema
modify_schema = schema
if not isinstance(modify_schema, Schema):
modify_schema = Schema(modify_schema)
self.modify_schema = modify_schema
self.mock_obj = MockObj(f"lv_{self.lv_name}", "_")
@property
@@ -163,7 +164,6 @@ class WidgetType:
:param config: Its configuration
:return: Generated code as a list of text lines
"""
return []
async def obj_creator(self, parent: MockObjClass, config: dict):
"""
@@ -174,6 +174,13 @@ class WidgetType:
"""
return lv_expr.call(f"{self.lv_name}_create", parent)
def on_create(self, var: MockObj, config: dict):
"""
Called from to_code when the widget is created, to set up any initial properties
:param var: The variable representing the widget
:param config: Its configuration
"""
def get_uses(self):
"""
Get a list of other widgets used by this one
@@ -193,6 +200,14 @@ class WidgetType:
def get_scale(self, config: dict):
return 1.0
def validate(self, value):
"""
Provides an opportunity for custom validation for a given widget type
:param value:
:return:
"""
return value
class NumberType(WidgetType):
def get_max(self, config: dict):

View File

@@ -339,7 +339,10 @@ async def set_obj_properties(w: Widget, config):
if layout_type == TYPE_FLEX:
lv_obj.set_flex_flow(w.obj, literal(layout[CONF_FLEX_FLOW]))
main = literal(layout[CONF_FLEX_ALIGN_MAIN])
cross = literal(layout[CONF_FLEX_ALIGN_CROSS])
cross = layout[CONF_FLEX_ALIGN_CROSS]
if cross == "LV_FLEX_ALIGN_STRETCH":
cross = "LV_FLEX_ALIGN_CENTER"
cross = literal(cross)
track = literal(layout[CONF_FLEX_ALIGN_TRACK])
lv_obj.set_flex_align(w.obj, main, cross, track)
parts = collect_parts(config)
@@ -446,9 +449,11 @@ async def widget_to_code(w_cnfig, w_type: WidgetType, parent):
if spec.is_compound():
var = cg.new_Pvariable(wid)
lv_add(var.set_obj(creator))
spec.on_create(var.obj, w_cnfig)
else:
var = lv_Pvariable(lv_obj_t, wid)
lv_assign(var, creator)
spec.on_create(var, w_cnfig)
w = Widget.create(wid, var, spec, w_cnfig)
if theme := theme_widget_map.get(w_type):

View File

@@ -1,8 +1,9 @@
from esphome import automation
import esphome.codegen as cg
from esphome.components.const import CONF_ROWS
from esphome.components.key_provider import KeyProvider
import esphome.config_validation as cv
from esphome.const import CONF_ID, CONF_ITEMS, CONF_ROWS, CONF_TEXT, CONF_WIDTH
from esphome.const import CONF_ID, CONF_ITEMS, CONF_TEXT, CONF_WIDTH
from esphome.cpp_generator import MockObj
from ..automation import action_to_code

View File

@@ -159,18 +159,15 @@ async def canvas_set_pixel(config, action_id, template_arg, args):
)
DRAW_SCHEMA = cv.Schema(
{
cv.GenerateID(CONF_ID): cv.use_id(lv_canvas_t),
cv.Required(CONF_X): pixels,
cv.Required(CONF_Y): pixels,
}
)
DRAW_OPA_SCHEMA = DRAW_SCHEMA.extend(
{
cv.Optional(CONF_OPA): opacity,
}
)
DRAW_SCHEMA = {
cv.GenerateID(CONF_ID): cv.use_id(lv_canvas_t),
cv.Required(CONF_X): pixels,
cv.Required(CONF_Y): pixels,
}
DRAW_OPA_SCHEMA = {
**DRAW_SCHEMA,
cv.Optional(CONF_OPA): opacity,
}
async def draw_to_code(config, dsc_type, props, do_draw, action_id, template_arg, args):
@@ -224,12 +221,14 @@ RECT_PROPS = {
@automation.register_action(
"lvgl.canvas.draw_rectangle",
ObjUpdateAction,
DRAW_SCHEMA.extend(
cv.Schema(
{
**DRAW_OPA_SCHEMA,
cv.Required(CONF_WIDTH): cv.templatable(cv.int_),
cv.Required(CONF_HEIGHT): cv.templatable(cv.int_),
},
).extend({cv.Optional(prop): STYLE_PROPS[prop] for prop in RECT_PROPS}),
**{cv.Optional(prop): STYLE_PROPS[prop] for prop in RECT_PROPS},
}
),
)
async def canvas_draw_rect(config, action_id, template_arg, args):
width = await pixels.process(config[CONF_WIDTH])
@@ -261,13 +260,14 @@ TEXT_PROPS = {
@automation.register_action(
"lvgl.canvas.draw_text",
ObjUpdateAction,
TEXT_SCHEMA.extend(DRAW_OPA_SCHEMA)
.extend(
cv.Schema(
{
**TEXT_SCHEMA,
**DRAW_OPA_SCHEMA,
cv.Required(CONF_MAX_WIDTH): cv.templatable(cv.int_),
**{cv.Optional(prop): STYLE_PROPS[f"text_{prop}"] for prop in TEXT_PROPS},
},
)
.extend({cv.Optional(prop): STYLE_PROPS[f"text_{prop}"] for prop in TEXT_PROPS}),
),
)
async def canvas_draw_text(config, action_id, template_arg, args):
text = await lv_text.process(config[CONF_TEXT])
@@ -293,13 +293,15 @@ IMG_PROPS = {
@automation.register_action(
"lvgl.canvas.draw_image",
ObjUpdateAction,
DRAW_OPA_SCHEMA.extend(
cv.Schema(
{
**DRAW_OPA_SCHEMA,
cv.Required(CONF_SRC): lv_image,
cv.Optional(CONF_PIVOT_X, default=0): pixels,
cv.Optional(CONF_PIVOT_Y, default=0): pixels,
},
).extend({cv.Optional(prop): validator for prop, validator in IMG_PROPS.items()}),
**{cv.Optional(prop): validator for prop, validator in IMG_PROPS.items()},
}
),
)
async def canvas_draw_image(config, action_id, template_arg, args):
src = await lv_image.process(config[CONF_SRC])
@@ -336,8 +338,9 @@ LINE_PROPS = {
cv.GenerateID(CONF_ID): cv.use_id(lv_canvas_t),
cv.Optional(CONF_OPA): opacity,
cv.Required(CONF_POINTS): cv.ensure_list(point_schema),
},
).extend({cv.Optional(prop): validator for prop, validator in LINE_PROPS.items()}),
**{cv.Optional(prop): validator for prop, validator in LINE_PROPS.items()},
}
),
)
async def canvas_draw_line(config, action_id, template_arg, args):
points = [
@@ -363,8 +366,9 @@ async def canvas_draw_line(config, action_id, template_arg, args):
{
cv.GenerateID(CONF_ID): cv.use_id(lv_canvas_t),
cv.Required(CONF_POINTS): cv.ensure_list(point_schema),
**{cv.Optional(prop): STYLE_PROPS[prop] for prop in RECT_PROPS},
},
).extend({cv.Optional(prop): STYLE_PROPS[prop] for prop in RECT_PROPS}),
),
)
async def canvas_draw_polygon(config, action_id, template_arg, args):
points = [
@@ -395,13 +399,15 @@ ARC_PROPS = {
@automation.register_action(
"lvgl.canvas.draw_arc",
ObjUpdateAction,
DRAW_OPA_SCHEMA.extend(
cv.Schema(
{
**DRAW_OPA_SCHEMA,
cv.Required(CONF_RADIUS): pixels,
cv.Required(CONF_START_ANGLE): lv_angle_degrees,
cv.Required(CONF_END_ANGLE): lv_angle_degrees,
**{cv.Optional(prop): validator for prop, validator in ARC_PROPS.items()},
}
).extend({cv.Optional(prop): validator for prop, validator in ARC_PROPS.items()}),
),
)
async def canvas_draw_arc(config, action_id, template_arg, args):
radius = await size.process(config[CONF_RADIUS])

View File

@@ -17,11 +17,10 @@ class CheckboxType(WidgetType):
CONF_CHECKBOX,
LvBoolean("lv_checkbox_t"),
(CONF_MAIN, CONF_INDICATOR),
TEXT_SCHEMA.extend(
{
Optional(CONF_PAD_COLUMN): padding,
}
),
{
**TEXT_SCHEMA,
Optional(CONF_PAD_COLUMN): padding,
},
)
async def to_code(self, w: Widget, config):

View File

@@ -0,0 +1,39 @@
import esphome.config_validation as cv
from esphome.const import CONF_HEIGHT, CONF_WIDTH
from esphome.cpp_generator import MockObj
from ..defines import CONF_CONTAINER, CONF_MAIN, CONF_OBJ, CONF_SCROLLBAR
from ..lv_validation import size
from ..lvcode import lv
from ..types import WidgetType, lv_obj_t
CONTAINER_SCHEMA = cv.Schema(
{
cv.Optional(CONF_HEIGHT, default="100%"): size,
cv.Optional(CONF_WIDTH, default="100%"): size,
}
)
class ContainerType(WidgetType):
"""
A simple container widget that can hold other widgets and which defaults to a 100% size.
Made from an obj with all styles removed
"""
def __init__(self):
super().__init__(
CONF_CONTAINER,
lv_obj_t,
(CONF_MAIN, CONF_SCROLLBAR),
schema=CONTAINER_SCHEMA,
modify_schema={},
lv_name=CONF_OBJ,
)
self.styles = {}
def on_create(self, var: MockObj, config: dict):
lv.obj_remove_style_all(var)
container_spec = ContainerType()

View File

@@ -23,12 +23,11 @@ class LabelType(WidgetType):
CONF_LABEL,
LvText("lv_label_t"),
(CONF_MAIN, CONF_SCROLLBAR, CONF_SELECTED),
TEXT_SCHEMA.extend(
{
cv.Optional(CONF_RECOLOR): lv_bool,
cv.Optional(CONF_LONG_MODE): LV_LONG_MODES.one_of,
}
),
{
**TEXT_SCHEMA,
cv.Optional(CONF_RECOLOR): lv_bool,
cv.Optional(CONF_LONG_MODE): LV_LONG_MODES.one_of,
},
)
async def to_code(self, w: Widget, config):

View File

@@ -14,13 +14,12 @@ CONF_QRCODE = "qrcode"
CONF_DARK_COLOR = "dark_color"
CONF_LIGHT_COLOR = "light_color"
QRCODE_SCHEMA = TEXT_SCHEMA.extend(
{
cv.Optional(CONF_DARK_COLOR, default="black"): lv_color,
cv.Optional(CONF_LIGHT_COLOR, default="white"): lv_color,
cv.Required(CONF_SIZE): cv.int_,
}
)
QRCODE_SCHEMA = {
**TEXT_SCHEMA,
cv.Optional(CONF_DARK_COLOR, default="black"): lv_color,
cv.Optional(CONF_LIGHT_COLOR, default="white"): lv_color,
cv.Required(CONF_SIZE): cv.int_,
}
class QrCodeType(WidgetType):

View File

@@ -21,15 +21,14 @@ CONF_TEXTAREA = "textarea"
lv_textarea_t = LvText("lv_textarea_t")
TEXTAREA_SCHEMA = TEXT_SCHEMA.extend(
{
cv.Optional(CONF_PLACEHOLDER_TEXT): lv_text,
cv.Optional(CONF_ACCEPTED_CHARS): lv_text,
cv.Optional(CONF_ONE_LINE): lv_bool,
cv.Optional(CONF_PASSWORD_MODE): lv_bool,
cv.Optional(CONF_MAX_LENGTH): lv_int,
}
)
TEXTAREA_SCHEMA = {
**TEXT_SCHEMA,
cv.Optional(CONF_PLACEHOLDER_TEXT): lv_text,
cv.Optional(CONF_ACCEPTED_CHARS): lv_text,
cv.Optional(CONF_ONE_LINE): lv_bool,
cv.Optional(CONF_PASSWORD_MODE): lv_bool,
cv.Optional(CONF_MAX_LENGTH): lv_int,
}
class TextareaType(WidgetType):

View File

@@ -1,8 +1,9 @@
from esphome import automation, pins
import esphome.codegen as cg
from esphome.components import key_provider
from esphome.components.const import CONF_ROWS
import esphome.config_validation as cv
from esphome.const import CONF_ID, CONF_ON_KEY, CONF_PIN, CONF_ROWS, CONF_TRIGGER_ID
from esphome.const import CONF_ID, CONF_ON_KEY, CONF_PIN, CONF_TRIGGER_ID
CODEOWNERS = ["@ssieb"]

View File

@@ -3,7 +3,7 @@ import re
from esphome import automation
from esphome.automation import Condition
import esphome.codegen as cg
from esphome.components import logger
from esphome.components import logger, socket
from esphome.components.esp32 import add_idf_sdkconfig_option
from esphome.config_helpers import filter_source_files_from_platform
import esphome.config_validation as cv
@@ -66,6 +66,9 @@ DEPENDENCIES = ["network"]
def AUTO_LOAD():
if CORE.is_esp8266 or CORE.is_libretiny:
return ["async_tcp", "json"]
# ESP32 needs socket for wake_loop_threadsafe()
if CORE.is_esp32:
return ["json", "socket"]
return ["json"]
@@ -213,8 +216,6 @@ def validate_fingerprint(value):
def _consume_mqtt_sockets(config: ConfigType) -> ConfigType:
"""Register socket needs for MQTT component."""
from esphome.components import socket
# MQTT needs 1 socket for the broker connection
socket.consume_sockets(1, "mqtt")(config)
return config
@@ -341,6 +342,11 @@ async def to_code(config):
# https://github.com/heman/async-mqtt-client/blob/master/library.json
cg.add_library("heman/AsyncMqttClient-esphome", "2.0.0")
# MQTT on ESP32 uses wake_loop_threadsafe() to wake the main loop from the MQTT event handler
# This enables low-latency MQTT event processing instead of waiting for select() timeout
if CORE.is_esp32:
socket.require_wake_loop_threadsafe()
cg.add_define("USE_MQTT")
cg.add_global(mqtt_ns.using)

View File

@@ -190,6 +190,11 @@ void MQTTBackendESP32::mqtt_event_handler(void *handler_args, esp_event_base_t b
if (instance) {
auto event = *static_cast<esp_mqtt_event_t *>(event_data);
instance->mqtt_events_.emplace(event);
// Wake main loop immediately to process MQTT event instead of waiting for select() timeout
#if defined(USE_SOCKET_SELECT_SUPPORT) && defined(USE_WAKE_LOOP_THREADSAFE)
App.wake_loop_threadsafe();
#endif
}
}

View File

@@ -1,6 +1,6 @@
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.const import CONF_ADDRESS
from esphome.const import CONF_ADDRESS, CONF_INDEX
CODEOWNERS = ["@ssieb"]
@@ -21,7 +21,8 @@ def one_wire_device_schema():
return cv.Schema(
{
cv.GenerateID(CONF_ONE_WIRE_ID): cv.use_id(OneWireBus),
cv.Optional(CONF_ADDRESS): cv.hex_uint64_t,
cv.Exclusive(CONF_ADDRESS, "index_or_address"): cv.hex_uint64_t,
cv.Exclusive(CONF_INDEX, "index_or_address"): cv.uint8_t,
}
)
@@ -37,3 +38,5 @@ async def register_one_wire_device(var, config):
cg.add(var.set_one_wire_bus(parent))
if (address := config.get(CONF_ADDRESS)) is not None:
cg.add(var.set_address(address))
if (index := config.get(CONF_INDEX)) is not None:
cg.add(var.set_index(index))

View File

@@ -18,10 +18,20 @@ bool OneWireDevice::send_command_(uint8_t cmd) {
return true;
}
bool OneWireDevice::check_address_() {
bool OneWireDevice::check_address_or_index_() {
if (this->address_ != 0)
return true;
auto devices = this->bus_->get_devices();
if (this->index_ != INDEX_NOT_SET) {
if (this->index_ >= devices.size()) {
ESP_LOGE(TAG, "Index %d out of range, only %d devices found", this->index_, devices.size());
return false;
}
this->address_ = devices[this->index_];
return true;
}
if (devices.empty()) {
ESP_LOGE(TAG, "No devices, can't auto-select address");
return false;

View File

@@ -17,6 +17,8 @@ class OneWireDevice {
/// @param address of the device
void set_address(uint64_t address) { this->address_ = address; }
void set_index(uint8_t index) { this->index_ = index; }
/// @brief store the pointer to the OneWireBus to use
/// @param bus pointer to the OneWireBus object
void set_one_wire_bus(OneWireBus *bus) { this->bus_ = bus; }
@@ -25,13 +27,16 @@ class OneWireDevice {
const std::string &get_address_name();
protected:
static constexpr uint8_t INDEX_NOT_SET = 255;
uint64_t address_{0};
uint8_t index_{INDEX_NOT_SET};
OneWireBus *bus_{nullptr}; ///< pointer to OneWireBus instance
std::string address_name_;
/// @brief find an address if necessary
/// should be called from setup
bool check_address_();
bool check_address_or_index_();
/// @brief send command on the bus
/// @param cmd command to send

View File

@@ -1,4 +1,5 @@
import logging
import textwrap
import esphome.codegen as cg
from esphome.components.esp32 import (
@@ -104,6 +105,17 @@ def get_config_schema(config):
if not speeds:
raise cv.Invalid("PSRAM is not supported on this chip")
modes = SPIRAM_MODES[variant]
if CONF_MODE not in config and len(modes) != 1:
raise (
cv.Invalid(
textwrap.dedent(
f"""
{variant} requires PSRAM mode selection; one of {", ".join(modes)}
Selection of the wrong mode for the board will cause a runtime failure to initialise PSRAM
"""
)
)
)
return cv.Schema(
{
cv.GenerateID(): cv.declare_id(PsramComponent),

View File

@@ -26,21 +26,12 @@ from esphome.const import (
from esphome.core import CORE, HexInt
from esphome.core.entity_helpers import inherit_property_from
from esphome.external_files import download_content
from esphome.types import ConfigType
from esphome.final_validate import full_config
_LOGGER = logging.getLogger(__name__)
def AUTO_LOAD(config: ConfigType) -> list[str]:
load = ["audio"]
if (
not config
or config.get(CONF_TASK_STACK_IN_PSRAM)
or config.get(CONF_CODEC_SUPPORT_ENABLED)
):
return load + ["psram"]
return load
AUTO_LOAD = ["audio"]
CODEOWNERS = ["@kahrendt", "@synesthesiam"]
DOMAIN = "media_player"
@@ -226,12 +217,19 @@ def _validate_repeated_speaker(config):
return config
def _validate_supported_local_file(config):
def _final_validate(config):
# Default to using codec if psram is enabled
if (use_codec := config.get(CONF_CODEC_SUPPORT_ENABLED)) is None:
use_codec = psram.DOMAIN in full_config.get()
conf_id = config[CONF_ID].id
core_data = CORE.data.setdefault(DOMAIN, {conf_id: {}})
core_data[conf_id][CONF_CODEC_SUPPORT_ENABLED] = use_codec
for file_config in config.get(CONF_FILES, []):
_, media_file_type = _read_audio_file_and_type(file_config)
if str(media_file_type) == str(audio.AUDIO_FILE_TYPE_ENUM["NONE"]):
raise cv.Invalid("Unsupported local media file")
if not config[CONF_CODEC_SUPPORT_ENABLED] and str(media_file_type) != str(
if not use_codec and str(media_file_type) != str(
audio.AUDIO_FILE_TYPE_ENUM["WAV"]
):
# Only wav files are supported
@@ -290,11 +288,11 @@ CONFIG_SCHEMA = cv.All(
cv.Optional(CONF_BUFFER_SIZE, default=1000000): cv.int_range(
min=4000, max=4000000
),
cv.Optional(
CONF_CODEC_SUPPORT_ENABLED, default=psram.supported()
): cv.boolean,
cv.Optional(CONF_CODEC_SUPPORT_ENABLED): cv.boolean,
cv.Optional(CONF_FILES): cv.ensure_list(MEDIA_FILE_TYPE_SCHEMA),
cv.Optional(CONF_TASK_STACK_IN_PSRAM, default=False): cv.boolean,
cv.Optional(CONF_TASK_STACK_IN_PSRAM): cv.All(
cv.boolean, cv.requires_component(psram.DOMAIN)
),
cv.Optional(CONF_VOLUME_INCREMENT, default=0.05): cv.percentage,
cv.Optional(CONF_VOLUME_INITIAL, default=0.5): cv.percentage,
cv.Optional(CONF_VOLUME_MAX, default=1.0): cv.percentage,
@@ -317,12 +315,12 @@ FINAL_VALIDATE_SCHEMA = cv.All(
},
extra=cv.ALLOW_EXTRA,
),
_validate_supported_local_file,
_final_validate,
)
async def to_code(config):
if config[CONF_CODEC_SUPPORT_ENABLED]:
if CORE.data[DOMAIN][config[CONF_ID].id][CONF_CODEC_SUPPORT_ENABLED]:
# Compile all supported audio codecs and optimize the wifi settings
cg.add_define("USE_AUDIO_FLAC_SUPPORT", True)
@@ -352,8 +350,8 @@ async def to_code(config):
cg.add(var.set_buffer_size(config[CONF_BUFFER_SIZE]))
cg.add(var.set_task_stack_in_psram(config[CONF_TASK_STACK_IN_PSRAM]))
if config[CONF_TASK_STACK_IN_PSRAM]:
if config.get(CONF_TASK_STACK_IN_PSRAM):
cg.add(var.set_task_stack_in_psram(True))
esp32.add_idf_sdkconfig_option(
"CONFIG_SPIRAM_ALLOW_STACK_EXTERNAL_MEMORY", True
)

View File

@@ -838,7 +838,6 @@ CONF_RMT_CHANNEL = "rmt_channel"
CONF_RMT_SYMBOLS = "rmt_symbols"
CONF_ROTATION = "rotation"
CONF_ROW = "row"
CONF_ROWS = "rows"
CONF_RS_PIN = "rs_pin"
CONF_RTD_NOMINAL_RESISTANCE = "rtd_nominal_resistance"
CONF_RTD_WIRES = "rtd_wires"

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
from collections.abc import Callable
from functools import cache
import hashlib
import json
import os
import os.path
@@ -52,6 +53,10 @@ BASE_BUS_COMPONENTS = {
"remote_receiver",
}
# Cache version for components graph
# Increment this when the cache format or graph building logic changes
COMPONENTS_GRAPH_CACHE_VERSION = 1
def parse_list_components_output(output: str) -> list[str]:
"""Parse the output from list-components.py script.
@@ -101,7 +106,11 @@ def get_component_from_path(file_path: str) -> str | None:
):
parts = file_path.split("/")
if len(parts) >= 3 and parts[2]:
return parts[2]
# Verify that parts[2] is actually a component directory, not a file
# like .gitignore or README.md in the components directory itself
component_name = parts[2]
if "." not in component_name:
return component_name
return None
@@ -752,20 +761,71 @@ def resolve_auto_load(
return auto_load()
@cache
def get_components_graph_cache_key() -> str:
"""Generate cache key based on all component Python file hashes.
Uses git ls-files with sha1 hashes to generate a stable cache key that works
across different machines and CI runs. This is faster and more reliable than
reading file contents or using modification times.
Returns:
SHA256 hex string uniquely identifying the current component state
"""
# Use git ls-files -s to get sha1 hashes of all component Python files
# Format: <mode> <sha1> <stage> <path>
# This is fast and works consistently across CI and local dev
# We hash all .py files because AUTO_LOAD, DEPENDENCIES, etc. can be defined
# in any Python file, not just __init__.py
cmd = ["git", "ls-files", "-s", "esphome/components/**/*.py"]
result = subprocess.run(
cmd, capture_output=True, text=True, check=True, cwd=root_path, close_fds=False
)
# Hash the git output (includes file paths and their sha1 hashes)
# This changes only when component Python files actually change
hasher = hashlib.sha256()
hasher.update(result.stdout.encode())
return hasher.hexdigest()
def create_components_graph() -> dict[str, list[str]]:
"""Create a graph of component dependencies.
"""Create a graph of component dependencies (cached).
This function is expensive (5-6 seconds) because it imports all ESPHome components
to extract their DEPENDENCIES and AUTO_LOAD metadata. The result is cached based
on component file modification times, so unchanged components don't trigger a rebuild.
Returns:
Dictionary mapping parent components to their children (dependencies)
"""
from pathlib import Path
# Check cache first - use fixed filename since GitHub Actions cache doesn't support wildcards
cache_file = Path(temp_folder) / "components_graph.json"
if cache_file.exists():
try:
cached_data = json.loads(cache_file.read_text())
except (OSError, json.JSONDecodeError):
# Cache file corrupted or unreadable, rebuild
pass
else:
# Verify cache version matches
if cached_data.get("_version") == COMPONENTS_GRAPH_CACHE_VERSION:
# Verify cache is for current component state
cache_key = get_components_graph_cache_key()
if cached_data.get("_cache_key") == cache_key:
return cached_data.get("graph", {})
# Cache key mismatch - stale cache, rebuild
# Cache version mismatch - incompatible format, rebuild
from esphome import const
from esphome.core import CORE
from esphome.loader import ComponentManifest, get_component, get_platform
# The root directory of the repo
root = Path(__file__).parent.parent
root = Path(root_path)
components_dir = root / ESPHOME_COMPONENTS_PATH
# Fake some directory so that get_component works
CORE.config_path = root
@@ -842,6 +902,15 @@ def create_components_graph() -> dict[str, list[str]]:
# restore config
CORE.data[KEY_CORE] = TARGET_CONFIGURATIONS[0]
# Save to cache with version and cache key for validation
cache_data = {
"_version": COMPONENTS_GRAPH_CACHE_VERSION,
"_cache_key": get_components_graph_cache_key(),
"graph": components_graph,
}
cache_file.parent.mkdir(exist_ok=True)
cache_file.write_text(json.dumps(cache_data))
return components_graph

View File

@@ -34,6 +34,12 @@ SUPPORTED_PSRAM_VARIANTS = [
VARIANT_ESP32S3,
VARIANT_ESP32P4,
]
SUPPORTED_PSRAM_MODES = {
VARIANT_ESP32: ["quad"],
VARIANT_ESP32S2: ["quad"],
VARIANT_ESP32S3: ["quad", "octal"],
VARIANT_ESP32P4: ["hex"],
}
@pytest.mark.parametrize(
@@ -86,7 +92,7 @@ def test_psram_configuration_valid_supported_variants(
from esphome.components.psram import CONFIG_SCHEMA, FINAL_VALIDATE_SCHEMA
# This should not raise an exception
config = CONFIG_SCHEMA({})
config = CONFIG_SCHEMA({"mode": SUPPORTED_PSRAM_MODES[variant][0]})
FINAL_VALIDATE_SCHEMA(config)
@@ -122,7 +128,7 @@ def _setup_psram_final_validation_test(
("config", "esp32_config", "expect_error", "error_match"),
[
pytest.param(
{"speed": "120MHz"},
{"mode": "quad", "speed": "120MHz"},
{"cpu_frequency": "160MHz"},
True,
r"PSRAM 120MHz requires 240MHz CPU frequency",
@@ -143,7 +149,7 @@ def _setup_psram_final_validation_test(
id="ecc_only_in_octal_mode",
),
pytest.param(
{"speed": "120MHZ"},
{"mode": "quad", "speed": "120MHZ"},
{"cpu_frequency": "240MHZ"},
False,
None,

View File

@@ -9,3 +9,6 @@ sensor:
resolution: 9
- platform: dallas_temp
name: Dallas Temperature 2
- platform: dallas_temp
name: Dallas Temperature 3
index: 2

View File

@@ -1,4 +1,7 @@
packages:
i2c: !include ../../test_build_components/common/i2c/esp32-idf.yaml
psram:
mode: quad
<<: !include common.yaml

View File

@@ -113,9 +113,10 @@ lvgl:
title: Messagebox
bg_color: 0xffff
widgets:
- label:
text: Hello Msgbox
id: msgbox_label
# Test single widget without list
label:
text: Hello Msgbox
id: msgbox_label
body:
text: This is a sample messagebox
bg_color: 0x808080
@@ -281,7 +282,7 @@ lvgl:
#endif
return std::string(buf);
align: top_left
- obj:
- container:
align: center
arc_opa: COVER
arc_color: 0xFF0000
@@ -414,6 +415,7 @@ lvgl:
- buttons:
- id: button_e
- button:
layout: 2x1
id: button_button
width: 20%
height: 10%
@@ -430,8 +432,13 @@ lvgl:
checked:
bg_color: 0x000000
widgets:
- label:
text: Button
# Test parse a dict instead of list
label:
text: Button
align: bottom_right
image:
src: cat_image
align: top_left
on_click:
- lvgl.widget.focus: spin_up
- lvgl.widget.focus: next
@@ -539,6 +546,7 @@ lvgl:
- logger.log: "tile 1 is now showing"
tiles:
- id: tile_1
layout: vertical
row: 0
column: 0
dir: ALL
@@ -554,6 +562,7 @@ lvgl:
bg_color: 0x000000
- id: page2
layout: vertical
widgets:
- canvas:
id: canvas_id
@@ -1005,6 +1014,7 @@ lvgl:
r_mod: -20
opa: 0%
- id: page3
layout: horizontal
widgets:
- keyboard:
id: lv_keyboard

View File

@@ -9,3 +9,4 @@ display:
lvgl:
psram:
mode: quad

View File

@@ -543,6 +543,7 @@ def test_main_filters_components_without_tests(
with (
patch.object(determine_jobs, "root_path", str(tmp_path)),
patch.object(helpers, "root_path", str(tmp_path)),
patch.object(helpers, "create_components_graph", return_value={}),
patch("sys.argv", ["determine-jobs.py"]),
patch.object(
determine_jobs,
@@ -640,6 +641,7 @@ def test_main_detects_components_with_variant_tests(
with (
patch.object(determine_jobs, "root_path", str(tmp_path)),
patch.object(helpers, "root_path", str(tmp_path)),
patch.object(helpers, "create_components_graph", return_value={}),
patch("sys.argv", ["determine-jobs.py"]),
patch.object(
determine_jobs,

View File

@@ -1,5 +1,6 @@
"""Unit tests for script/helpers.py module."""
from collections.abc import Generator
import json
import os
from pathlib import Path
@@ -1093,6 +1094,11 @@ def test_parse_list_components_output(output: str, expected: list[str]) -> None:
("tests/components/", None), # No component name
("esphome/components", None), # No trailing slash
("tests/components", None), # No trailing slash
# Files in component directories that are not components
("tests/components/.gitignore", None), # Hidden file
("tests/components/README.md", None), # Documentation file
("esphome/components/__init__.py", None), # Python init file
("tests/components/main.cpp", None), # File with extension
],
)
def test_get_component_from_path(
@@ -1101,3 +1107,262 @@ def test_get_component_from_path(
"""Test extraction of component names from file paths."""
result = helpers.get_component_from_path(file_path)
assert result == expected_component
# Components graph cache tests
@pytest.fixture
def mock_git_output() -> str:
"""Fixture for mock git ls-files output with realistic component files.
Includes examples of AUTO_LOAD in sensor.py and binary_sensor.py files,
which is why we need to hash all .py files, not just __init__.py.
"""
return (
"100644 abc123... 0 esphome/components/wifi/__init__.py\n"
"100644 def456... 0 esphome/components/api/__init__.py\n"
"100644 ghi789... 0 esphome/components/xiaomi_lywsd03mmc/__init__.py\n"
"100644 jkl012... 0 esphome/components/xiaomi_lywsd03mmc/sensor.py\n"
"100644 mno345... 0 esphome/components/xiaomi_cgpr1/__init__.py\n"
"100644 pqr678... 0 esphome/components/xiaomi_cgpr1/binary_sensor.py\n"
)
@pytest.fixture
def mock_cache_file(tmp_path: Path) -> Path:
"""Fixture for a temporary cache file path."""
return tmp_path / "components_graph.json"
@pytest.fixture(autouse=True)
def clear_cache_key_cache() -> None:
"""Clear the components graph cache key cache before each test."""
helpers.get_components_graph_cache_key.cache_clear()
@pytest.fixture
def mock_subprocess_run() -> Generator[Mock, None, None]:
"""Fixture to mock subprocess.run for git commands."""
with patch("subprocess.run") as mock_run:
yield mock_run
def test_cache_key_generation(mock_git_output: str, mock_subprocess_run: Mock) -> None:
"""Test that cache key is generated based on git file hashes."""
mock_result = Mock()
mock_result.stdout = mock_git_output
mock_subprocess_run.return_value = mock_result
key = helpers.get_components_graph_cache_key()
# Should be a 64-character hex string (SHA256)
assert len(key) == 64
assert all(c in "0123456789abcdef" for c in key)
def test_cache_key_consistent_for_same_files(
mock_git_output: str, mock_subprocess_run: Mock
) -> None:
"""Test that same git output produces same cache key."""
mock_result = Mock()
mock_result.stdout = mock_git_output
mock_subprocess_run.return_value = mock_result
key1 = helpers.get_components_graph_cache_key()
key2 = helpers.get_components_graph_cache_key()
assert key1 == key2
def test_cache_key_different_for_changed_files(mock_subprocess_run: Mock) -> None:
"""Test that different git output produces different cache key.
This test demonstrates that changes to any .py file (not just __init__.py)
will invalidate the cache, which is important because AUTO_LOAD can be
defined in sensor.py, binary_sensor.py, etc.
"""
mock_result1 = Mock()
mock_result1.stdout = (
"100644 abc123... 0 esphome/components/xiaomi_lywsd03mmc/sensor.py\n"
)
mock_result2 = Mock()
# Same file, different hash - simulates a change to AUTO_LOAD
mock_result2.stdout = (
"100644 xyz789... 0 esphome/components/xiaomi_lywsd03mmc/sensor.py\n"
)
mock_subprocess_run.return_value = mock_result1
key1 = helpers.get_components_graph_cache_key()
helpers.get_components_graph_cache_key.cache_clear()
mock_subprocess_run.return_value = mock_result2
key2 = helpers.get_components_graph_cache_key()
assert key1 != key2
def test_cache_key_uses_git_ls_files(
mock_git_output: str, mock_subprocess_run: Mock
) -> None:
"""Test that git ls-files command is called correctly."""
mock_result = Mock()
mock_result.stdout = mock_git_output
mock_subprocess_run.return_value = mock_result
helpers.get_components_graph_cache_key()
# Verify git ls-files was called with correct arguments
mock_subprocess_run.assert_called_once()
call_args = mock_subprocess_run.call_args
assert call_args[0][0] == [
"git",
"ls-files",
"-s",
"esphome/components/**/*.py",
]
assert call_args[1]["capture_output"] is True
assert call_args[1]["text"] is True
assert call_args[1]["check"] is True
assert call_args[1]["close_fds"] is False
def test_cache_hit_returns_cached_graph(
tmp_path: Path, mock_git_output: str, mock_subprocess_run: Mock
) -> None:
"""Test that cache hit returns cached data without rebuilding."""
mock_graph = {"wifi": ["network"], "api": ["socket"]}
cache_key = "a" * 64
cache_data = {
"_version": helpers.COMPONENTS_GRAPH_CACHE_VERSION,
"_cache_key": cache_key,
"graph": mock_graph,
}
# Write cache file
cache_file = tmp_path / "components_graph.json"
cache_file.write_text(json.dumps(cache_data))
mock_result = Mock()
mock_result.stdout = mock_git_output
mock_subprocess_run.return_value = mock_result
with (
patch("helpers.get_components_graph_cache_key", return_value=cache_key),
patch("helpers.temp_folder", str(tmp_path)),
):
result = helpers.create_components_graph()
assert result == mock_graph
def test_cache_miss_no_cache_file(
tmp_path: Path, mock_git_output: str, mock_subprocess_run: Mock
) -> None:
"""Test that cache miss rebuilds graph when no cache file exists."""
mock_result = Mock()
mock_result.stdout = mock_git_output
mock_subprocess_run.return_value = mock_result
# Create minimal components directory structure
components_dir = tmp_path / "esphome" / "components"
components_dir.mkdir(parents=True)
with (
patch("helpers.root_path", str(tmp_path)),
patch("helpers.temp_folder", str(tmp_path / ".temp")),
patch("helpers.get_components_graph_cache_key", return_value="test_key"),
):
result = helpers.create_components_graph()
# Should return empty graph for empty components directory
assert result == {}
def test_cache_miss_version_mismatch(
tmp_path: Path, mock_git_output: str, mock_subprocess_run: Mock
) -> None:
"""Test that cache miss rebuilds graph when version doesn't match."""
cache_data = {
"_version": 999, # Wrong version
"_cache_key": "test_key",
"graph": {"old": ["data"]},
}
cache_file = tmp_path / ".temp" / "components_graph.json"
cache_file.parent.mkdir(parents=True)
cache_file.write_text(json.dumps(cache_data))
mock_result = Mock()
mock_result.stdout = mock_git_output
mock_subprocess_run.return_value = mock_result
# Create minimal components directory structure
components_dir = tmp_path / "esphome" / "components"
components_dir.mkdir(parents=True)
with (
patch("helpers.root_path", str(tmp_path)),
patch("helpers.temp_folder", str(tmp_path / ".temp")),
patch("helpers.get_components_graph_cache_key", return_value="test_key"),
):
result = helpers.create_components_graph()
# Should rebuild and return empty graph, not use cached data
assert result == {}
def test_cache_miss_key_mismatch(
tmp_path: Path, mock_git_output: str, mock_subprocess_run: Mock
) -> None:
"""Test that cache miss rebuilds graph when cache key doesn't match."""
cache_data = {
"_version": helpers.COMPONENTS_GRAPH_CACHE_VERSION,
"_cache_key": "old_key",
"graph": {"old": ["data"]},
}
cache_file = tmp_path / ".temp" / "components_graph.json"
cache_file.parent.mkdir(parents=True)
cache_file.write_text(json.dumps(cache_data))
mock_result = Mock()
mock_result.stdout = mock_git_output
mock_subprocess_run.return_value = mock_result
# Create minimal components directory structure
components_dir = tmp_path / "esphome" / "components"
components_dir.mkdir(parents=True)
with (
patch("helpers.root_path", str(tmp_path)),
patch("helpers.temp_folder", str(tmp_path / ".temp")),
patch("helpers.get_components_graph_cache_key", return_value="new_key"),
):
result = helpers.create_components_graph()
# Should rebuild and return empty graph, not use cached data with old key
assert result == {}
def test_cache_miss_corrupted_json(
tmp_path: Path, mock_git_output: str, mock_subprocess_run: Mock
) -> None:
"""Test that cache miss rebuilds graph when cache file has invalid JSON."""
cache_file = tmp_path / ".temp" / "components_graph.json"
cache_file.parent.mkdir(parents=True)
cache_file.write_text("{invalid json")
mock_result = Mock()
mock_result.stdout = mock_git_output
mock_subprocess_run.return_value = mock_result
# Create minimal components directory structure
components_dir = tmp_path / "esphome" / "components"
components_dir.mkdir(parents=True)
with (
patch("helpers.root_path", str(tmp_path)),
patch("helpers.temp_folder", str(tmp_path / ".temp")),
patch("helpers.get_components_graph_cache_key", return_value="test_key"),
):
result = helpers.create_components_graph()
# Should handle corruption gracefully and rebuild
assert result == {}

View File

@@ -1,4 +1,6 @@
# I2C bus for camera sensor
psram:
i2c:
- id: i2c_camera_bus
sda: 25