1
0
mirror of https://github.com/esphome/esphome.git synced 2025-04-15 07:10:33 +01:00
Oleg Tarasov 21cb941bbe
Add OpenTherm component (part 2.1: sensor platform) (#7529)
Co-authored-by: Jesse Hills <3060199+jesserockz@users.noreply.github.com>
2024-10-25 15:00:28 +13:00

141 lines
5.0 KiB
Python

from collections.abc import Awaitable
from typing import Any, Callable
import esphome.codegen as cg
from esphome.const import CONF_ID
from . import const
from .schema import TSchema
opentherm_ns = cg.esphome_ns.namespace("opentherm")
OpenthermHub = opentherm_ns.class_("OpenthermHub", cg.Component)
def define_has_component(component_type: str, keys: list[str]) -> None:
cg.add_define(
f"OPENTHERM_{component_type.upper()}_LIST(F, sep)",
cg.RawExpression(
" sep ".join(map(lambda key: f"F({key}_{component_type.lower()})", keys))
),
)
for key in keys:
cg.add_define(f"OPENTHERM_HAS_{component_type.upper()}_{key}")
def define_message_handler(
component_type: str, keys: list[str], schemas: dict[str, TSchema]
) -> None:
# The macros defined here should be able to generate things like this:
# // Parsing a message and publishing to sensors
# case MessageId::Message:
# // Can have multiple sensors here, for example for a Status message with multiple flags
# this->thing_binary_sensor->publish_state(parse_flag8_lb_0(response));
# this->other_binary_sensor->publish_state(parse_flag8_lb_1(response));
# break;
# // Building a message for a write request
# case MessageId::Message: {
# unsigned int data = 0;
# data = write_flag8_lb_0(some_input_switch->state, data); // Where input_sensor can also be a number/output/switch
# data = write_u8_hb(some_number->state, data);
# return opentherm_->build_request_(MessageType::WriteData, MessageId::Message, data);
# }
messages: dict[str, list[tuple[str, str]]] = {}
for key in keys:
msg = schemas[key].message
if msg not in messages:
messages[msg] = []
messages[msg].append((key, schemas[key].message_data))
cg.add_define(
f"OPENTHERM_{component_type.upper()}_MESSAGE_HANDLERS(MESSAGE, ENTITY, entity_sep, postscript, msg_sep)",
cg.RawExpression(
" msg_sep ".join(
[
f"MESSAGE({msg}) "
+ " entity_sep ".join(
[
f"ENTITY({key}_{component_type.lower()}, {msg_data})"
for key, msg_data in keys
]
)
+ " postscript"
for msg, keys in messages.items()
]
)
),
)
def define_readers(component_type: str, keys: list[str]) -> None:
for key in keys:
cg.add_define(
f"OPENTHERM_READ_{key}",
cg.RawExpression(f"this->{key}_{component_type.lower()}->state"),
)
def add_messages(hub: cg.MockObj, keys: list[str], schemas: dict[str, TSchema]):
messages: set[tuple[str, bool]] = set()
for key in keys:
messages.add((schemas[key].message, schemas[key].keep_updated))
for msg, keep_updated in messages:
msg_expr = cg.RawExpression(f"esphome::opentherm::MessageId::{msg}")
if keep_updated:
cg.add(hub.add_repeating_message(msg_expr))
else:
cg.add(hub.add_initial_message(msg_expr))
def add_property_set(var: cg.MockObj, config_key: str, config: dict[str, Any]) -> None:
if config_key in config:
cg.add(getattr(var, f"set_{config_key}")(config[config_key]))
Create = Callable[[dict[str, Any], str, cg.MockObj], Awaitable[cg.Pvariable]]
def create_only_conf(
create: Callable[[dict[str, Any]], Awaitable[cg.Pvariable]]
) -> Create:
return lambda conf, _key, _hub: create(conf)
async def component_to_code(
component_type: str,
schemas: dict[str, TSchema],
type: cg.MockObjClass,
create: Create,
config: dict[str, Any],
) -> list[str]:
"""Generate the code for each configured component in the schema of a component type.
Parameters:
- component_type: The type of component, e.g. "sensor" or "binary_sensor"
- schema_: The schema for that component type, a list of available components
- type: The type of the component, e.g. sensor.Sensor or OpenthermOutput
- create: A constructor function for the component, which receives the config,
the key and the hub and should asynchronously return the new component
- config: The configuration for this component type
Returns: The list of keys for the created components
"""
cg.add_define(f"OPENTHERM_USE_{component_type.upper()}")
hub = await cg.get_variable(config[const.CONF_OPENTHERM_ID])
keys: list[str] = []
for key, conf in config.items():
if not isinstance(conf, dict):
continue
id = conf[CONF_ID]
if id and id.type == type:
entity = await create(conf, key, hub)
cg.add(getattr(hub, f"set_{key}_{component_type.lower()}")(entity))
keys.append(key)
define_has_component(component_type, keys)
define_message_handler(component_type, keys, schemas)
add_messages(hub, keys, schemas)
return keys