mirror of
https://github.com/esphome/esphome.git
synced 2025-04-07 11:20:28 +01:00
* schema dump idea accept boolean or anything default accept null also for full dicts added some common validators more simple validators support multi_conf better handle automations updates updates handle lists removed not needed class move to own folder generalized for automations lists, etc updates updates clean up clean up fix automations made comment optional basic docs support added more docs fixes docs handling updates updates fix components parent updates updates updates Fix inkplate 6 registration updates Disable logging for vscode add on better handle buses keep extended order as in CONFIGs updates updates updates disable comments moved to scripts/build_jsonschema added configurable decorators path handling fix handle list_schema fixes and cleanup add jschema_extractor to maybe updates lint no schema in git add generated loggers list * lint
148 lines
5.1 KiB
Python
148 lines
5.1 KiB
Python
import esphome.codegen as cg
|
|
import esphome.config_validation as cv
|
|
from esphome import automation
|
|
from esphome.core import CORE, coroutine
|
|
from esphome.const import CONF_ID, CONF_TRIGGER_ID, CONF_DATA
|
|
|
|
CODEOWNERS = ["@mvturnho", "@danielschramm"]
|
|
IS_PLATFORM_COMPONENT = True
|
|
|
|
CONF_CAN_ID = "can_id"
|
|
CONF_USE_EXTENDED_ID = "use_extended_id"
|
|
CONF_CANBUS_ID = "canbus_id"
|
|
CONF_BIT_RATE = "bit_rate"
|
|
CONF_ON_FRAME = "on_frame"
|
|
|
|
|
|
def validate_id(id_value, id_ext):
|
|
if not id_ext:
|
|
if id_value > 0x7FF:
|
|
raise cv.Invalid("Standard IDs must be 11 Bit (0x000-0x7ff / 0-2047)")
|
|
|
|
|
|
def validate_raw_data(value):
|
|
if isinstance(value, str):
|
|
return value.encode("utf-8")
|
|
if isinstance(value, list):
|
|
return cv.Schema([cv.hex_uint8_t])(value)
|
|
raise cv.Invalid(
|
|
"data must either be a string wrapped in quotes or a list of bytes"
|
|
)
|
|
|
|
|
|
canbus_ns = cg.esphome_ns.namespace("canbus")
|
|
CanbusComponent = canbus_ns.class_("CanbusComponent", cg.Component)
|
|
CanbusTrigger = canbus_ns.class_(
|
|
"CanbusTrigger",
|
|
automation.Trigger.template(cg.std_vector.template(cg.uint8)),
|
|
cg.Component,
|
|
)
|
|
CanSpeed = canbus_ns.enum("CAN_SPEED")
|
|
|
|
CAN_SPEEDS = {
|
|
"5KBPS": CanSpeed.CAN_5KBPS,
|
|
"10KBPS": CanSpeed.CAN_10KBPS,
|
|
"20KBPS": CanSpeed.CAN_20KBPS,
|
|
"31K25BPS": CanSpeed.CAN_31K25BPS,
|
|
"33KBPS": CanSpeed.CAN_33KBPS,
|
|
"40KBPS": CanSpeed.CAN_40KBPS,
|
|
"50KBPS": CanSpeed.CAN_50KBPS,
|
|
"80KBPS": CanSpeed.CAN_80KBPS,
|
|
"83K3BPS": CanSpeed.CAN_83K3BPS,
|
|
"95KBPS": CanSpeed.CAN_95KBPS,
|
|
"100KBPS": CanSpeed.CAN_100KBPS,
|
|
"125KBPS": CanSpeed.CAN_125KBPS,
|
|
"200KBPS": CanSpeed.CAN_200KBPS,
|
|
"250KBPS": CanSpeed.CAN_250KBPS,
|
|
"500KBPS": CanSpeed.CAN_500KBPS,
|
|
"1000KBPS": CanSpeed.CAN_1000KBPS,
|
|
}
|
|
|
|
CANBUS_SCHEMA = cv.Schema(
|
|
{
|
|
cv.GenerateID(): cv.declare_id(CanbusComponent),
|
|
cv.Required(CONF_CAN_ID): cv.int_range(min=0, max=0x1FFFFFFF),
|
|
cv.Optional(CONF_BIT_RATE, default="125KBPS"): cv.enum(CAN_SPEEDS, upper=True),
|
|
cv.Optional(CONF_USE_EXTENDED_ID, default=False): cv.boolean,
|
|
cv.Optional(CONF_ON_FRAME): automation.validate_automation(
|
|
{
|
|
cv.GenerateID(CONF_TRIGGER_ID): cv.declare_id(CanbusTrigger),
|
|
cv.GenerateID(CONF_CAN_ID): cv.int_range(min=0, max=0x1FFFFFFF),
|
|
cv.Optional(CONF_USE_EXTENDED_ID, default=False): cv.boolean,
|
|
cv.Optional(CONF_ON_FRAME): automation.validate_automation(
|
|
{
|
|
cv.GenerateID(CONF_TRIGGER_ID): cv.declare_id(CanbusTrigger),
|
|
cv.GenerateID(CONF_CAN_ID): cv.int_range(min=0, max=0x1FFFFFFF),
|
|
cv.Optional(CONF_USE_EXTENDED_ID, default=False): cv.boolean,
|
|
}
|
|
),
|
|
}
|
|
),
|
|
}
|
|
).extend(cv.COMPONENT_SCHEMA)
|
|
|
|
|
|
@coroutine
|
|
def setup_canbus_core_(var, config):
|
|
validate_id(config[CONF_CAN_ID], config[CONF_USE_EXTENDED_ID])
|
|
yield cg.register_component(var, config)
|
|
cg.add(var.set_can_id([config[CONF_CAN_ID]]))
|
|
cg.add(var.set_use_extended_id([config[CONF_USE_EXTENDED_ID]]))
|
|
cg.add(var.set_bitrate(CAN_SPEEDS[config[CONF_BIT_RATE]]))
|
|
|
|
for conf in config.get(CONF_ON_FRAME, []):
|
|
can_id = conf[CONF_CAN_ID]
|
|
ext_id = conf[CONF_USE_EXTENDED_ID]
|
|
validate_id(can_id, ext_id)
|
|
trigger = cg.new_Pvariable(conf[CONF_TRIGGER_ID], var, can_id, ext_id)
|
|
yield cg.register_component(trigger, conf)
|
|
yield automation.build_automation(
|
|
trigger, [(cg.std_vector.template(cg.uint8), "x")], conf
|
|
)
|
|
|
|
|
|
@coroutine
|
|
def register_canbus(var, config):
|
|
if not CORE.has_id(config[CONF_ID]):
|
|
var = cg.new_Pvariable(config[CONF_ID], var)
|
|
yield setup_canbus_core_(var, config)
|
|
|
|
|
|
# Actions
|
|
@automation.register_action(
|
|
"canbus.send",
|
|
canbus_ns.class_("CanbusSendAction", automation.Action),
|
|
cv.maybe_simple_value(
|
|
{
|
|
cv.GenerateID(CONF_CANBUS_ID): cv.use_id(CanbusComponent),
|
|
cv.Optional(CONF_CAN_ID): cv.int_range(min=0, max=0x1FFFFFFF),
|
|
cv.Optional(CONF_USE_EXTENDED_ID, default=False): cv.boolean,
|
|
cv.Required(CONF_DATA): cv.templatable(validate_raw_data),
|
|
},
|
|
key=CONF_DATA,
|
|
),
|
|
)
|
|
def canbus_action_to_code(config, action_id, template_arg, args):
|
|
validate_id(config[CONF_CAN_ID], config[CONF_USE_EXTENDED_ID])
|
|
var = cg.new_Pvariable(action_id, template_arg)
|
|
yield cg.register_parented(var, config[CONF_CANBUS_ID])
|
|
|
|
if CONF_CAN_ID in config:
|
|
can_id = yield cg.templatable(config[CONF_CAN_ID], args, cg.uint32)
|
|
cg.add(var.set_can_id(can_id))
|
|
|
|
use_extended_id = yield cg.templatable(
|
|
config[CONF_USE_EXTENDED_ID], args, cg.uint32
|
|
)
|
|
cg.add(var.set_use_extended_id(use_extended_id))
|
|
|
|
data = config[CONF_DATA]
|
|
if isinstance(data, bytes):
|
|
data = [int(x) for x in data]
|
|
if cg.is_template(data):
|
|
templ = yield cg.templatable(data, args, cg.std_vector.template(cg.uint8))
|
|
cg.add(var.set_data_template(templ))
|
|
else:
|
|
cg.add(var.set_data_static(data))
|
|
yield var
|