mirror of
https://github.com/esphome/esphome.git
synced 2025-09-13 00:32:20 +01:00
Modbus_controller: Add custom command. (#2680)
This commit is contained in:
@@ -1,10 +1,20 @@
|
||||
import binascii
|
||||
import esphome.codegen as cg
|
||||
import esphome.config_validation as cv
|
||||
from esphome.components import modbus
|
||||
from esphome.const import CONF_ID, CONF_ADDRESS
|
||||
from esphome.const import CONF_ADDRESS, CONF_ID, CONF_NAME, CONF_LAMBDA, CONF_OFFSET
|
||||
from esphome.cpp_helpers import logging
|
||||
from .const import (
|
||||
CONF_BITMASK,
|
||||
CONF_BYTE_OFFSET,
|
||||
CONF_COMMAND_THROTTLE,
|
||||
CONF_CUSTOM_COMMAND,
|
||||
CONF_FORCE_NEW_RANGE,
|
||||
CONF_MODBUS_CONTROLLER_ID,
|
||||
CONF_REGISTER_COUNT,
|
||||
CONF_REGISTER_TYPE,
|
||||
CONF_SKIP_UPDATES,
|
||||
CONF_VALUE_TYPE,
|
||||
)
|
||||
|
||||
CODEOWNERS = ["@martgras"]
|
||||
@@ -37,6 +47,7 @@ MODBUS_FUNCTION_CODE = {
|
||||
ModbusRegisterType_ns = modbus_controller_ns.namespace("ModbusRegisterType")
|
||||
ModbusRegisterType = ModbusRegisterType_ns.enum("ModbusRegisterType")
|
||||
MODBUS_REGISTER_TYPE = {
|
||||
"custom": ModbusRegisterType.CUSTOM,
|
||||
"coil": ModbusRegisterType.COIL,
|
||||
"discrete_input": ModbusRegisterType.DISCRETE_INPUT,
|
||||
"holding": ModbusRegisterType.HOLDING,
|
||||
@@ -95,6 +106,96 @@ CONFIG_SCHEMA = cv.All(
|
||||
)
|
||||
|
||||
|
||||
ModbusItemBaseSchema = cv.Schema(
|
||||
{
|
||||
cv.GenerateID(CONF_MODBUS_CONTROLLER_ID): cv.use_id(ModbusController),
|
||||
cv.Optional(CONF_ADDRESS): cv.positive_int,
|
||||
cv.Optional(CONF_CUSTOM_COMMAND): cv.ensure_list(cv.hex_uint8_t),
|
||||
cv.Exclusive(
|
||||
CONF_OFFSET,
|
||||
"offset",
|
||||
f"{CONF_OFFSET} and {CONF_BYTE_OFFSET} can't be used together",
|
||||
): cv.positive_int,
|
||||
cv.Exclusive(
|
||||
CONF_BYTE_OFFSET,
|
||||
"offset",
|
||||
f"{CONF_OFFSET} and {CONF_BYTE_OFFSET} can't be used together",
|
||||
): cv.positive_int,
|
||||
cv.Optional(CONF_BITMASK, default=0xFFFFFFFF): cv.hex_uint32_t,
|
||||
cv.Optional(CONF_SKIP_UPDATES, default=0): cv.positive_int,
|
||||
cv.Optional(CONF_FORCE_NEW_RANGE, default=False): cv.boolean,
|
||||
cv.Optional(CONF_LAMBDA): cv.returning_lambda,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def validate_modbus_register(config):
|
||||
if CONF_CUSTOM_COMMAND not in config and CONF_ADDRESS not in config:
|
||||
raise cv.Invalid(
|
||||
f" {CONF_ADDRESS} is a required property if '{CONF_CUSTOM_COMMAND}:' isn't used"
|
||||
)
|
||||
if CONF_CUSTOM_COMMAND in config and CONF_REGISTER_TYPE in config:
|
||||
raise cv.Invalid(
|
||||
f"can't use '{CONF_REGISTER_TYPE}:' together with '{CONF_CUSTOM_COMMAND}:'",
|
||||
)
|
||||
|
||||
if CONF_CUSTOM_COMMAND not in config and CONF_REGISTER_TYPE not in config:
|
||||
raise cv.Invalid(
|
||||
f" {CONF_REGISTER_TYPE} is a required property if '{CONF_CUSTOM_COMMAND}:' isn't used"
|
||||
)
|
||||
return config
|
||||
|
||||
|
||||
def modbus_calc_properties(config):
|
||||
byte_offset = 0
|
||||
reg_count = 0
|
||||
if CONF_OFFSET in config:
|
||||
byte_offset = config[CONF_OFFSET]
|
||||
# A CONF_BYTE_OFFSET setting overrides CONF_OFFSET
|
||||
if CONF_BYTE_OFFSET in config:
|
||||
byte_offset = config[CONF_BYTE_OFFSET]
|
||||
if CONF_REGISTER_COUNT in config:
|
||||
reg_count = config[CONF_REGISTER_COUNT]
|
||||
if CONF_VALUE_TYPE in config:
|
||||
value_type = config[CONF_VALUE_TYPE]
|
||||
if reg_count == 0:
|
||||
reg_count = TYPE_REGISTER_MAP[value_type]
|
||||
if CONF_CUSTOM_COMMAND in config:
|
||||
if CONF_ADDRESS not in config:
|
||||
# generate a unique modbus address using the hash of the name
|
||||
# CONF_NAME set even if only CONF_ID is used.
|
||||
# a modbus register address is required to add the item to sensormap
|
||||
value = config[CONF_NAME]
|
||||
if isinstance(value, str):
|
||||
value = value.encode()
|
||||
config[CONF_ADDRESS] = binascii.crc_hqx(value, 0)
|
||||
config[CONF_REGISTER_TYPE] = ModbusRegisterType.CUSTOM
|
||||
config[CONF_FORCE_NEW_RANGE] = True
|
||||
return byte_offset, reg_count
|
||||
|
||||
|
||||
async def add_modbus_base_properties(
|
||||
var, config, sensor_type, lamdba_param_type=cg.float_, lamdba_return_type=float
|
||||
):
|
||||
if CONF_CUSTOM_COMMAND in config:
|
||||
cg.add(var.set_custom_data(config[CONF_CUSTOM_COMMAND]))
|
||||
|
||||
if CONF_LAMBDA in config:
|
||||
template_ = await cg.process_lambda(
|
||||
config[CONF_LAMBDA],
|
||||
[
|
||||
(sensor_type.operator("ptr"), "item"),
|
||||
(lamdba_param_type, "x"),
|
||||
(
|
||||
cg.std_vector.template(cg.uint8).operator("const").operator("ref"),
|
||||
"data",
|
||||
),
|
||||
],
|
||||
return_type=cg.optional.template(lamdba_return_type),
|
||||
)
|
||||
cg.add(var.set_template(template_))
|
||||
|
||||
|
||||
async def to_code(config):
|
||||
var = cg.new_Pvariable(config[CONF_ID], config[CONF_COMMAND_THROTTLE])
|
||||
cg.add(var.set_command_throttle(config[CONF_COMMAND_THROTTLE]))
|
||||
@@ -119,11 +220,3 @@ def function_code_to_register(function_code):
|
||||
"write_multiple_registers": ModbusRegisterType.HOLDING,
|
||||
}
|
||||
return FUNCTION_CODE_TYPE_MAP[function_code]
|
||||
|
||||
|
||||
def find_by_value(dict, find_value):
|
||||
for (key, value) in MODBUS_REGISTER_TYPE.items():
|
||||
print(find_value, value)
|
||||
if find_value == value:
|
||||
return key
|
||||
return "not found"
|
||||
|
Reference in New Issue
Block a user