mirror of
https://github.com/esphome/esphome.git
synced 2025-10-29 14:13:51 +00:00
Add OpenTherm component (part 2.1: sensor platform) (#7529)
Co-authored-by: Jesse Hills <3060199+jesserockz@users.noreply.github.com>
This commit is contained in:
140
esphome/components/opentherm/generate.py
Normal file
140
esphome/components/opentherm/generate.py
Normal file
@@ -0,0 +1,140 @@
|
||||
from collections.abc import Awaitable
|
||||
from typing import Any, Callable
|
||||
|
||||
import esphome.codegen as cg
|
||||
from esphome.const import CONF_ID
|
||||
from . import const
|
||||
from .schema import TSchema
|
||||
|
||||
opentherm_ns = cg.esphome_ns.namespace("opentherm")
|
||||
OpenthermHub = opentherm_ns.class_("OpenthermHub", cg.Component)
|
||||
|
||||
|
||||
def define_has_component(component_type: str, keys: list[str]) -> None:
|
||||
cg.add_define(
|
||||
f"OPENTHERM_{component_type.upper()}_LIST(F, sep)",
|
||||
cg.RawExpression(
|
||||
" sep ".join(map(lambda key: f"F({key}_{component_type.lower()})", keys))
|
||||
),
|
||||
)
|
||||
for key in keys:
|
||||
cg.add_define(f"OPENTHERM_HAS_{component_type.upper()}_{key}")
|
||||
|
||||
|
||||
def define_message_handler(
|
||||
component_type: str, keys: list[str], schemas: dict[str, TSchema]
|
||||
) -> None:
|
||||
# The macros defined here should be able to generate things like this:
|
||||
# // Parsing a message and publishing to sensors
|
||||
# case MessageId::Message:
|
||||
# // Can have multiple sensors here, for example for a Status message with multiple flags
|
||||
# this->thing_binary_sensor->publish_state(parse_flag8_lb_0(response));
|
||||
# this->other_binary_sensor->publish_state(parse_flag8_lb_1(response));
|
||||
# break;
|
||||
# // Building a message for a write request
|
||||
# case MessageId::Message: {
|
||||
# unsigned int data = 0;
|
||||
# data = write_flag8_lb_0(some_input_switch->state, data); // Where input_sensor can also be a number/output/switch
|
||||
# data = write_u8_hb(some_number->state, data);
|
||||
# return opentherm_->build_request_(MessageType::WriteData, MessageId::Message, data);
|
||||
# }
|
||||
|
||||
messages: dict[str, list[tuple[str, str]]] = {}
|
||||
for key in keys:
|
||||
msg = schemas[key].message
|
||||
if msg not in messages:
|
||||
messages[msg] = []
|
||||
messages[msg].append((key, schemas[key].message_data))
|
||||
|
||||
cg.add_define(
|
||||
f"OPENTHERM_{component_type.upper()}_MESSAGE_HANDLERS(MESSAGE, ENTITY, entity_sep, postscript, msg_sep)",
|
||||
cg.RawExpression(
|
||||
" msg_sep ".join(
|
||||
[
|
||||
f"MESSAGE({msg}) "
|
||||
+ " entity_sep ".join(
|
||||
[
|
||||
f"ENTITY({key}_{component_type.lower()}, {msg_data})"
|
||||
for key, msg_data in keys
|
||||
]
|
||||
)
|
||||
+ " postscript"
|
||||
for msg, keys in messages.items()
|
||||
]
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def define_readers(component_type: str, keys: list[str]) -> None:
|
||||
for key in keys:
|
||||
cg.add_define(
|
||||
f"OPENTHERM_READ_{key}",
|
||||
cg.RawExpression(f"this->{key}_{component_type.lower()}->state"),
|
||||
)
|
||||
|
||||
|
||||
def add_messages(hub: cg.MockObj, keys: list[str], schemas: dict[str, TSchema]):
|
||||
messages: set[tuple[str, bool]] = set()
|
||||
for key in keys:
|
||||
messages.add((schemas[key].message, schemas[key].keep_updated))
|
||||
for msg, keep_updated in messages:
|
||||
msg_expr = cg.RawExpression(f"esphome::opentherm::MessageId::{msg}")
|
||||
if keep_updated:
|
||||
cg.add(hub.add_repeating_message(msg_expr))
|
||||
else:
|
||||
cg.add(hub.add_initial_message(msg_expr))
|
||||
|
||||
|
||||
def add_property_set(var: cg.MockObj, config_key: str, config: dict[str, Any]) -> None:
|
||||
if config_key in config:
|
||||
cg.add(getattr(var, f"set_{config_key}")(config[config_key]))
|
||||
|
||||
|
||||
Create = Callable[[dict[str, Any], str, cg.MockObj], Awaitable[cg.Pvariable]]
|
||||
|
||||
|
||||
def create_only_conf(
|
||||
create: Callable[[dict[str, Any]], Awaitable[cg.Pvariable]]
|
||||
) -> Create:
|
||||
return lambda conf, _key, _hub: create(conf)
|
||||
|
||||
|
||||
async def component_to_code(
|
||||
component_type: str,
|
||||
schemas: dict[str, TSchema],
|
||||
type: cg.MockObjClass,
|
||||
create: Create,
|
||||
config: dict[str, Any],
|
||||
) -> list[str]:
|
||||
"""Generate the code for each configured component in the schema of a component type.
|
||||
|
||||
Parameters:
|
||||
- component_type: The type of component, e.g. "sensor" or "binary_sensor"
|
||||
- schema_: The schema for that component type, a list of available components
|
||||
- type: The type of the component, e.g. sensor.Sensor or OpenthermOutput
|
||||
- create: A constructor function for the component, which receives the config,
|
||||
the key and the hub and should asynchronously return the new component
|
||||
- config: The configuration for this component type
|
||||
|
||||
Returns: The list of keys for the created components
|
||||
"""
|
||||
cg.add_define(f"OPENTHERM_USE_{component_type.upper()}")
|
||||
|
||||
hub = await cg.get_variable(config[const.CONF_OPENTHERM_ID])
|
||||
|
||||
keys: list[str] = []
|
||||
for key, conf in config.items():
|
||||
if not isinstance(conf, dict):
|
||||
continue
|
||||
id = conf[CONF_ID]
|
||||
if id and id.type == type:
|
||||
entity = await create(conf, key, hub)
|
||||
cg.add(getattr(hub, f"set_{key}_{component_type.lower()}")(entity))
|
||||
keys.append(key)
|
||||
|
||||
define_has_component(component_type, keys)
|
||||
define_message_handler(component_type, keys, schemas)
|
||||
add_messages(hub, keys, schemas)
|
||||
|
||||
return keys
|
||||
Reference in New Issue
Block a user